This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 63ed438 feat: expose timestamp nanos as Arrow timestamp(ns) (#21)
63ed438 is described below
commit 63ed4383b2da4e6eaf4f933a7732e70f9810f979
Author: QuakeWang <[email protected]>
AuthorDate: Thu May 21 09:15:13 2026 +0800
feat: expose timestamp nanos as Arrow timestamp(ns) (#21)
---
core/src/bucket_reader.rs | 85 ++++--
core/src/bucket_writer.rs | 146 ++++++++--
core/src/reader_tests.rs | 313 +++++++++++++++++++--
core/src/stats.rs | 27 ++
core/src/types.rs | 60 ++--
core/tests/robustness_test.rs | 47 ++--
core/tests/stress_test.rs | 39 +--
cpp/test_mosaic.cpp | 49 +++-
docs/design.html | 9 +
docs/java-api.html | 8 +-
docs/python-api.html | 6 +
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 49 ++++
python/mosaic/_ffi.py | 8 +-
python/tests/test_mosaic.py | 39 +++
14 files changed, 732 insertions(+), 153 deletions(-)
diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index 495cae1..115bae4 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -56,10 +56,8 @@ fn data_variant_for_type(dt: &DataType) -> DataVariant {
DataVariant::Binary
}
}
+ dt if types::is_timestamp_nanos(dt) => DataVariant::TimestampNanos,
DataType::Timestamp(_, _) => DataVariant::Int64,
- DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields)
=> {
- DataVariant::TimestampNanos
- }
_ => DataVariant::Binary,
}
}
@@ -104,13 +102,11 @@ fn empty_raw_data_for_type(dt: &DataType) ->
RawColumnData {
}
}
}
+ dt if types::is_timestamp_nanos(dt) => RawColumnData::TimestampNanos {
+ millis: Vec::new(),
+ nanos_of_milli: Vec::new(),
+ },
DataType::Timestamp(_, _) => RawColumnData::Int64(Vec::new()),
- DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields)
=> {
- RawColumnData::TimestampNanos {
- millis: Vec::new(),
- nanos_of_milli: Vec::new(),
- }
- }
_ => RawColumnData::Binary {
offsets: vec![0],
data: Vec::new(),
@@ -191,10 +187,10 @@ fn build_array(
dt: &DataType,
null_bitmap: Option<Vec<u8>>,
num_rows: usize,
-) -> ArrayRef {
+) -> io::Result<ArrayRef> {
let null_buf = make_null_buffer(null_bitmap.clone(), num_rows);
- match data {
+ Ok(match data {
RawColumnData::Boolean(values) => {
let bool_buf = BooleanBuffer::new(Buffer::from_vec(values), 0,
num_rows);
Arc::new(BooleanArray::new(bool_buf, null_buf))
@@ -307,19 +303,43 @@ fn build_array(
} => {
let millis_scattered = scatter_fixed(millis, &null_bitmap,
num_rows);
let nanos_scattered = scatter_fixed(nanos_of_milli, &null_bitmap,
num_rows);
- let millis_array = Arc::new(Int64Array::from(millis_scattered)) as
ArrayRef;
- let nanos_array = Arc::new(Int32Array::from(nanos_scattered)) as
ArrayRef;
- let fields = vec![
- Field::new("millis", DataType::Int64, false),
- Field::new("nanos_of_milli", DataType::Int32, false),
- ];
- Arc::new(StructArray::new(
- fields.into(),
- vec![millis_array, nanos_array],
- null_buf,
- ))
+
+ match dt {
+ DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+ let values = millis_scattered
+ .into_iter()
+ .zip(nanos_scattered)
+ .map(|(millis, nanos)|
types::millis_nanos_to_ns(millis, nanos))
+ .collect::<io::Result<Vec<_>>>()?;
+ let arr =
TimestampNanosecondArray::new(ScalarBuffer::from(values), null_buf);
+ Arc::new(if let Some(tz) = tz {
+ arr.with_timezone(tz.clone())
+ } else {
+ arr
+ })
+ }
+ DataType::Struct(fields) if
types::is_timestamp_nanos_struct(fields) => {
+ let millis_array =
Arc::new(Int64Array::from(millis_scattered)) as ArrayRef;
+ let nanos_array =
Arc::new(Int32Array::from(nanos_scattered)) as ArrayRef;
+ let fields = vec![
+ Field::new("millis", DataType::Int64, false),
+ Field::new("nanos_of_milli", DataType::Int32, false),
+ ];
+ Arc::new(StructArray::new(
+ fields.into(),
+ vec![millis_array, nanos_array],
+ null_buf,
+ ))
+ }
+ _ => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("timestamp nanos data for non-nanos type:
{:?}", dt),
+ ));
+ }
+ }
}
- }
+ })
}
pub struct BucketReader {
@@ -536,7 +556,12 @@ impl BucketReader {
_ => empty_raw_data_for_type(&self.col_types[i]),
};
- result.push(build_array(data, &self.col_types[i], null_bitmap,
num_rows));
+ result.push(build_array(
+ data,
+ &self.col_types[i],
+ null_bitmap,
+ num_rows,
+ )?);
}
Ok(result)
@@ -788,7 +813,7 @@ impl ColumnPageReader {
_ => empty_raw_data_for_type(&self.col_type),
};
- Ok(build_array(data, &self.col_type, null_bitmap, num_rows))
+ build_array(data, &self.col_type, null_bitmap, num_rows)
}
}
@@ -824,6 +849,16 @@ pub(crate) fn read_typed_value(dt: &DataType, buf: &[u8],
pos: usize, width: i32
DataType::Decimal128(_, _) => Value::DecimalCompact(read_i64(buf,
pos)),
DataType::Timestamp(TimeUnit::Millisecond, _) =>
Value::TimestampMillis(read_i64(buf, pos)),
DataType::Timestamp(TimeUnit::Microsecond, _) =>
Value::TimestampMicros(read_i64(buf, pos)),
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ debug_assert_eq!(width, 12);
+ let millis = read_i64(buf, pos);
+ let nanos =
+ i32::from_be_bytes([buf[pos + 8], buf[pos + 9], buf[pos + 10],
buf[pos + 11]]);
+ Value::TimestampNanos {
+ millis,
+ nanos_of_milli: nanos,
+ }
+ }
DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields)
=> {
debug_assert_eq!(width, 12);
let millis = read_i64(buf, pos);
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index 699095c..501e6ba 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -144,7 +144,8 @@ impl BucketWriter {
let typed = downcast_array(array, dt)?;
if array.null_count() == 0 {
- if let Some(col_size) = self.append_no_null_batch(col, &typed,
start_row, num_new_rows)
+ if let Some(col_size) =
+ self.append_no_null_batch(col, &typed, start_row,
num_new_rows)?
{
return Ok(col_size);
}
@@ -158,7 +159,7 @@ impl BucketWriter {
} else {
self.non_null_counts[col] += 1;
let before = self.value_buffers[col].len();
- write_typed_value(&mut self.value_buffers[col], &typed, row);
+ write_typed_value(&mut self.value_buffers[col], &typed, row)?;
let written = self.value_buffers[col].len() - before;
col_size += written;
@@ -207,7 +208,7 @@ impl BucketWriter {
typed: &TypedArrayRef,
start_row: usize,
num_rows: usize,
- ) -> Option<usize> {
+ ) -> io::Result<Option<usize>> {
let buf = &mut self.value_buffers[col];
let before_all = buf.len();
@@ -295,16 +296,22 @@ impl BucketWriter {
buf.extend_from_slice(&v.to_be_bytes());
}
}
- TypedArrayRef::TimestampNanos { millis, nanos } => {
+ TypedArrayRef::TimestampNanos(a) => {
+ let vals = a.values();
+ buf.reserve(num_rows * 12);
+ for &v in vals.iter() {
+ write_timestamp_nanos_value(buf, v);
+ }
+ }
+ TypedArrayRef::LegacyTimestampNanos { millis, nanos } => {
let m_vals = millis.values();
let n_vals = nanos.values();
buf.reserve(num_rows * 12);
for i in 0..num_rows {
- buf.extend_from_slice(&m_vals[i].to_be_bytes());
- buf.extend_from_slice(&n_vals[i].to_be_bytes());
+ write_legacy_timestamp_nanos_value(buf, m_vals[i],
n_vals[i])?;
}
}
- _ => return None,
+ _ => return Ok(None),
}
let col_size = buf.len() - before_all;
@@ -342,12 +349,28 @@ impl BucketWriter {
break;
}
}
+ } else if let Some(ref mut dict) = self.byte_dict_maps[col] {
+ for i in 0..num_rows {
+ let start = before_all + i * fw;
+ let slice = &buf[start..start + fw];
+ if !dict.contains_key(slice) {
+ let len = dict.len();
+ dict.insert(slice.to_vec(), len);
+ self.dict_total_bytes[col] += fw;
+ }
+ if dict.len() > self.max_dict_entries
+ || self.dict_total_bytes[col] > self.max_dict_total_bytes
+ {
+ self.byte_dict_maps[col] = None;
+ break;
+ }
+ }
}
// null_bitmap stays all-zero (no nulls), start_row offsets are fine
let _ = start_row;
- Some(col_size)
+ Ok(Some(col_size))
}
fn compute_encodings(&self) -> (Vec<u8>, Vec<bool>) {
@@ -688,7 +711,8 @@ enum TypedArrayRef<'a> {
Decimal128Large(&'a Decimal128Array),
TimestampMillis(&'a TimestampMillisecondArray),
TimestampMicros(&'a TimestampMicrosecondArray),
- TimestampNanos {
+ TimestampNanos(&'a TimestampNanosecondArray),
+ LegacyTimestampNanos {
millis: &'a Int64Array,
nanos: &'a Int32Array,
},
@@ -768,24 +792,30 @@ fn downcast_array<'a>(array: &'a dyn Array, dt:
&DataType) -> io::Result<TypedAr
.ok_or_else(|| cast_err(dt))?,
))
}
+ DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+ Ok(TypedArrayRef::TimestampNanos(
+ any.downcast_ref::<TimestampNanosecondArray>()
+ .ok_or_else(|| cast_err(dt))?,
+ ))
+ }
DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields)
=> {
let s = any
.downcast_ref::<StructArray>()
.ok_or_else(|| cast_err(dt))?;
let ts_dt = DataType::Int64;
let ns_dt = DataType::Int32;
- Ok(TypedArrayRef::TimestampNanos {
- millis: s
- .column(0)
- .as_any()
- .downcast_ref::<Int64Array>()
- .ok_or_else(|| cast_err(&ts_dt))?,
- nanos: s
- .column(1)
- .as_any()
- .downcast_ref::<Int32Array>()
- .ok_or_else(|| cast_err(&ns_dt))?,
- })
+ let millis = s
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .ok_or_else(|| cast_err(&ts_dt))?;
+ let nanos = s
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .ok_or_else(|| cast_err(&ns_dt))?;
+ validate_legacy_timestamp_nanos(s, millis, nanos)?;
+ Ok(TypedArrayRef::LegacyTimestampNanos { millis, nanos })
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
@@ -795,7 +825,7 @@ fn downcast_array<'a>(array: &'a dyn Array, dt: &DataType)
-> io::Result<TypedAr
}
#[inline]
-fn write_typed_value(buf: &mut Vec<u8>, typed: &TypedArrayRef, row: usize) {
+fn write_typed_value(buf: &mut Vec<u8>, typed: &TypedArrayRef, row: usize) ->
io::Result<()> {
match typed {
TypedArrayRef::Boolean(a) => buf.push(if a.value(row) { 1 } else { 0
}),
TypedArrayRef::Int8(a) => buf.push(a.value(row) as u8),
@@ -826,11 +856,57 @@ fn write_typed_value(buf: &mut Vec<u8>, typed:
&TypedArrayRef, row: usize) {
}
TypedArrayRef::TimestampMillis(a) =>
buf.extend_from_slice(&a.value(row).to_be_bytes()),
TypedArrayRef::TimestampMicros(a) =>
buf.extend_from_slice(&a.value(row).to_be_bytes()),
- TypedArrayRef::TimestampNanos { millis, nanos } => {
- buf.extend_from_slice(&millis.value(row).to_be_bytes());
- buf.extend_from_slice(&nanos.value(row).to_be_bytes());
+ TypedArrayRef::TimestampNanos(a) => write_timestamp_nanos_value(buf,
a.value(row)),
+ TypedArrayRef::LegacyTimestampNanos { millis, nanos } => {
+ write_legacy_timestamp_nanos_value(buf, millis.value(row),
nanos.value(row))?;
+ }
+ }
+ Ok(())
+}
+
+fn validate_legacy_timestamp_nanos(
+ parent: &StructArray,
+ millis: &Int64Array,
+ nanos: &Int32Array,
+) -> io::Result<()> {
+ for row in 0..parent.len() {
+ if parent.is_null(row) {
+ continue;
}
+ if millis.is_null(row) || nanos.is_null(row) {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "legacy timestamp nanos has null child for a non-null parent
row",
+ ));
+ }
+ validate_timestamp_nanos_pair(millis.value(row), nanos.value(row))?;
}
+ Ok(())
+}
+
+#[inline]
+fn write_timestamp_nanos_value(buf: &mut Vec<u8>, ns: i64) {
+ let (millis, nanos) = types::ns_to_millis_nanos(ns);
+ buf.extend_from_slice(&millis.to_be_bytes());
+ buf.extend_from_slice(&nanos.to_be_bytes());
+}
+
+#[inline]
+fn write_legacy_timestamp_nanos_value(
+ buf: &mut Vec<u8>,
+ millis: i64,
+ nanos: i32,
+) -> io::Result<()> {
+ validate_timestamp_nanos_pair(millis, nanos)?;
+ buf.extend_from_slice(&millis.to_be_bytes());
+ buf.extend_from_slice(&nanos.to_be_bytes());
+ Ok(())
+}
+
+fn validate_timestamp_nanos_pair(millis: i64, nanos: i32) -> io::Result<()> {
+ types::millis_nanos_to_ns(millis, nanos)
+ .map(|_| ())
+ .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput,
err.to_string()))
}
fn i128_to_biginteger_bytes(val: i128) -> Vec<u8> {
@@ -1033,6 +1109,26 @@ mod tests {
assert_eq!(data[0] & 0x03, ENCODING_DICT);
}
+ #[test]
+ fn test_timestamp_nanos_byte_dict_after_no_null_batch() {
+ let types = [DataType::Timestamp(
+ arrow_schema::TimeUnit::Nanosecond,
+ None,
+ )];
+ let type_refs: Vec<&DataType> = types.iter().collect();
+ let mut writer = BucketWriter::new(&type_refs, 32768, 255);
+
+ let first = TimestampNanosecondArray::from(vec![Some(1), None,
Some(2)]);
+ writer.write_columns(&[&first], &[&types[0]]).unwrap();
+
+ let second_values: Vec<i64> = (0..120).map(|i| 3 + (i % 3) as
i64).collect();
+ let second = TimestampNanosecondArray::from(second_values);
+ writer.write_columns(&[&second], &[&types[0]]).unwrap();
+
+ let data = writer.finish();
+ assert_eq!(data[0] & 0x03, ENCODING_DICT);
+ }
+
#[test]
fn test_multi_column_mixed_encodings() {
let types = [DataType::Int32, DataType::Utf8, DataType::Int64];
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index 3bdacda..55eecee 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -30,6 +30,18 @@ fn columns_to_arrow_schema(columns: &[(String, DataType,
bool)]) -> Schema {
)
}
+fn legacy_timestamp_nanos_fields() -> arrow_schema::Fields {
+ vec![
+ Field::new("millis", DataType::Int64, false),
+ Field::new("nanos_of_milli", DataType::Int32, false),
+ ]
+ .into()
+}
+
+fn legacy_timestamp_nanos_type() -> DataType {
+ DataType::Struct(legacy_timestamp_nanos_fields())
+}
+
struct ByteArrayInputFile {
data: Vec<u8>,
}
@@ -263,6 +275,25 @@ fn build_array_from_values(rows: &[Vec<Value>], col:
usize, dt: &DataType) -> Ar
arr
})
}
+ DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+ let vals: Vec<Option<i64>> = rows
+ .iter()
+ .map(|row| match &row[col] {
+ Value::TimestampNanos {
+ millis,
+ nanos_of_milli,
+ } => Some(crate::types::millis_nanos_to_ns(*millis,
*nanos_of_milli).unwrap()),
+ Value::Null => None,
+ _ => panic!("type mismatch"),
+ })
+ .collect();
+ let arr = TimestampNanosecondArray::from(vals);
+ Arc::new(if let Some(tz) = tz {
+ arr.with_timezone(tz.clone())
+ } else {
+ arr
+ })
+ }
DataType::Struct(struct_fields)
if crate::types::is_timestamp_nanos_struct(struct_fields) =>
{
@@ -698,20 +729,12 @@ fn test_roundtrip_all_types() {
let c_ts_high = batch
.column(col("f_timestamp_high"))
.as_any()
- .downcast_ref::<StructArray>()
- .unwrap();
- let millis_arr = c_ts_high
- .column(0)
- .as_any()
- .downcast_ref::<Int64Array>()
- .unwrap();
- let nanos_arr = c_ts_high
- .column(1)
- .as_any()
- .downcast_ref::<Int32Array>()
+ .downcast_ref::<TimestampNanosecondArray>()
.unwrap();
- assert_eq!(millis_arr.value(0), 1700000000000i64);
- assert_eq!(nanos_arr.value(0), 123456);
+ assert_eq!(
+ c_ts_high.value(0),
+ crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+ );
}
fn write_and_read(
@@ -1890,6 +1913,251 @@ fn test_timestamp_ltz_roundtrip() {
assert_eq!(ts1.timezone().unwrap(), "America/New_York");
}
+#[test]
+fn test_timestamp_nanos_roundtrip() {
+ let columns = vec![(
+ "ts_nanos".to_string(),
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ true,
+ )];
+ let rows = vec![
+ vec![Value::TimestampNanos {
+ millis: 1700000000000,
+ nanos_of_milli: 123456,
+ }],
+ vec![Value::Null],
+ vec![Value::TimestampNanos {
+ millis: -1,
+ nanos_of_milli: 999999,
+ }],
+ vec![Value::TimestampNanos {
+ millis: 0,
+ nanos_of_milli: 0,
+ }],
+ ];
+ let (reader, _) = write_and_read(columns, &rows);
+
+ assert_eq!(
+ reader.schema().columns[0].data_type,
+ DataType::Timestamp(TimeUnit::Nanosecond, None)
+ );
+
+ let mut rg = reader.row_group_reader(0).unwrap();
+ let batch = rg.read_columns().unwrap();
+ let ts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ assert_eq!(
+ ts.value(0),
+ crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+ );
+ assert!(ts.is_null(1));
+ assert_eq!(ts.value(2), -1);
+ assert_eq!(ts.value(3), 0);
+}
+
+#[test]
+fn test_timestamp_ltz_nanos_roundtrip() {
+ let columns = vec![(
+ "ts_ltz_nanos".to_string(),
+ DataType::Timestamp(TimeUnit::Nanosecond,
Some("Asia/Shanghai".into())),
+ true,
+ )];
+ let rows = vec![
+ vec![Value::TimestampNanos {
+ millis: 1700000000000,
+ nanos_of_milli: 42,
+ }],
+ vec![Value::Null],
+ ];
+ let (reader, _) = write_and_read(columns, &rows);
+
+ assert_eq!(
+ reader.schema().columns[0].data_type,
+ DataType::Timestamp(TimeUnit::Nanosecond, Some("Asia/Shanghai".into()))
+ );
+
+ let mut rg = reader.row_group_reader(0).unwrap();
+ let batch = rg.read_columns().unwrap();
+ let ts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ assert_eq!(ts.timezone().unwrap(), "Asia/Shanghai");
+ assert_eq!(
+ ts.value(0),
+ crate::types::millis_nanos_to_ns(1700000000000, 42).unwrap()
+ );
+ assert!(ts.is_null(1));
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_writes_as_arrow_nanos() {
+ let columns = vec![("ts".to_string(), legacy_timestamp_nanos_type(),
true)];
+ let rows = vec![vec![Value::TimestampNanos {
+ millis: 1700000000000,
+ nanos_of_milli: 123456,
+ }]];
+ let (reader, _) = write_and_read(columns, &rows);
+
+ assert_eq!(
+ reader.schema().columns[0].data_type,
+ DataType::Timestamp(TimeUnit::Nanosecond, None)
+ );
+
+ let mut rg = reader.row_group_reader(0).unwrap();
+ let batch = rg.read_columns().unwrap();
+ let ts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ assert_eq!(
+ ts.value(0),
+ crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+ );
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_rejects_child_null() {
+ let legacy_fields: arrow_schema::Fields = vec![
+ Field::new("millis", DataType::Int64, true),
+ Field::new("nanos_of_milli", DataType::Int32, true),
+ ]
+ .into();
+ let legacy_dt = DataType::Struct(legacy_fields.clone());
+ let schema = Schema::new(vec![Field::new("ts", legacy_dt.clone(), true)]);
+ let struct_array = StructArray::new(
+ legacy_fields,
+ vec![
+ Arc::new(Int64Array::from(vec![Some(0)])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![None])) as ArrayRef,
+ ],
+ Some(arrow_buffer::NullBuffer::from(vec![true])),
+ );
+ let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(struct_array)]).unwrap();
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &Schema::new(vec![Field::new("ts", legacy_dt, true)]),
+ WriterOptions::default(),
+ )
+ .unwrap();
+
+ let err = writer.write_batch(&batch).unwrap_err();
+ assert!(err.to_string().contains("null child"));
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_rejects_invalid_nanos() {
+ let legacy_dt = legacy_timestamp_nanos_type();
+ let schema = Schema::new(vec![Field::new("ts", legacy_dt.clone(), true)]);
+ let fields = legacy_timestamp_nanos_fields();
+ let struct_array = StructArray::new(
+ fields,
+ vec![
+ Arc::new(Int64Array::from(vec![Some(0)])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![Some(1_000_000)])) as ArrayRef,
+ ],
+ Some(arrow_buffer::NullBuffer::from(vec![true])),
+ );
+ let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(struct_array)]).unwrap();
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &Schema::new(vec![Field::new("ts", legacy_dt, true)]),
+ WriterOptions::default(),
+ )
+ .unwrap();
+
+ let err = writer.write_batch(&batch).unwrap_err();
+ assert!(err.to_string().contains("invalid nanos_of_milli"));
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_rejects_overflow() {
+ let legacy_dt = legacy_timestamp_nanos_type();
+ let schema = Schema::new(vec![Field::new("ts", legacy_dt.clone(), true)]);
+ let fields = legacy_timestamp_nanos_fields();
+ let struct_array = StructArray::new(
+ fields,
+ vec![
+ Arc::new(Int64Array::from(vec![Some(i64::MAX)])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef,
+ ],
+ Some(arrow_buffer::NullBuffer::from(vec![true])),
+ );
+ let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(struct_array)]).unwrap();
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &Schema::new(vec![Field::new("ts", legacy_dt, true)]),
+ WriterOptions::default(),
+ )
+ .unwrap();
+
+ let err = writer.write_batch(&batch).unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+ assert!(err.to_string().contains("overflow"));
+}
+
+#[test]
+fn test_timestamp_nanos_stats_min_max() {
+ let columns = vec![(
+ "ts".to_string(),
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ true,
+ )];
+ let rows = vec![
+ vec![Value::TimestampNanos {
+ millis: -1,
+ nanos_of_milli: 999999,
+ }],
+ vec![Value::Null],
+ vec![Value::TimestampNanos {
+ millis: 0,
+ nanos_of_milli: 42,
+ }],
+ vec![Value::TimestampNanos {
+ millis: -1,
+ nanos_of_milli: 999998,
+ }],
+ ];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ stats_columns: vec!["ts".to_string()],
+ ..Default::default()
+ },
+ )
+ .unwrap();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+
+ let stats = writer.row_group_stats(0);
+ assert_eq!(stats.len(), 1);
+ assert_eq!(stats[0].null_count, 1);
+ assert!(matches!(
+ stats[0].min,
+ Some(Value::TimestampNanos {
+ millis: -1,
+ nanos_of_milli: 999998
+ })
+ ));
+ assert!(matches!(
+ stats[0].max,
+ Some(Value::TimestampNanos {
+ millis: 0,
+ nanos_of_milli: 42
+ })
+ ));
+}
+
#[test]
fn test_timestamp_micros_roundtrip() {
let columns = vec![
@@ -1956,23 +2224,15 @@ fn test_timestamp_micros_roundtrip() {
let ts_nanos = batch
.column(nanos_pos)
.as_any()
- .downcast_ref::<StructArray>()
- .unwrap();
- let millis = ts_nanos
- .column(0)
- .as_any()
- .downcast_ref::<Int64Array>()
- .unwrap();
- let nanos = ts_nanos
- .column(1)
- .as_any()
- .downcast_ref::<Int32Array>()
+ .downcast_ref::<TimestampNanosecondArray>()
.unwrap();
assert_eq!(ts_millis.value(0), 1700000000000i64);
assert_eq!(ts_micros.value(0), 1_700_000_000_000_000i64);
- assert_eq!(millis.value(0), 1700000000000i64);
- assert_eq!(nanos.value(0), 123456);
+ assert_eq!(
+ ts_nanos.value(0),
+ crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+ );
assert!(ts_millis.is_null(1));
assert!(ts_micros.is_null(1));
@@ -1980,6 +2240,7 @@ fn test_timestamp_micros_roundtrip() {
assert_eq!(ts_millis.value(2), 0);
assert_eq!(ts_micros.value(2), 0);
+ assert_eq!(ts_nanos.value(2), 0);
}
#[test]
diff --git a/core/src/stats.rs b/core/src/stats.rs
index e96181d..860a5c2 100644
--- a/core/src/stats.rs
+++ b/core/src/stats.rs
@@ -245,6 +245,14 @@ fn extract_value_for_stats(array: &dyn Array, row: usize,
dt: &DataType) -> Opti
let a =
array.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
Some(Value::TimestampMicros(a.value(row)))
}
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ let a = array.as_any().downcast_ref::<TimestampNanosecondArray>()?;
+ let (millis, nanos) = types::ns_to_millis_nanos(a.value(row));
+ Some(Value::TimestampNanos {
+ millis,
+ nanos_of_milli: nanos,
+ })
+ }
DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields)
=> {
let s = array.as_any().downcast_ref::<StructArray>()?;
let millis = s
@@ -438,6 +446,25 @@ fn read_fixed_value(buf: &[u8], pos: usize, dt: &DataType,
width: i32) -> Value
buf[pos + 7],
]))
}
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ debug_assert_eq!(width, 12);
+ let millis = i64::from_be_bytes([
+ buf[pos],
+ buf[pos + 1],
+ buf[pos + 2],
+ buf[pos + 3],
+ buf[pos + 4],
+ buf[pos + 5],
+ buf[pos + 6],
+ buf[pos + 7],
+ ]);
+ let nanos =
+ i32::from_be_bytes([buf[pos + 8], buf[pos + 9], buf[pos + 10],
buf[pos + 11]]);
+ Value::TimestampNanos {
+ millis,
+ nanos_of_milli: nanos,
+ }
+ }
DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields)
=> {
debug_assert_eq!(width, 12);
let millis = i64::from_be_bytes([
diff --git a/core/src/types.rs b/core/src/types.rs
index 0652680..736c4c5 100644
--- a/core/src/types.rs
+++ b/core/src/types.rs
@@ -15,10 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use std::io;
+
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use crate::varint;
+pub const NANOS_PER_MILLI: i64 = 1_000_000;
+
pub fn fixed_width(dt: &DataType) -> i32 {
match dt {
DataType::Boolean | DataType::Int8 => 1,
@@ -26,12 +30,18 @@ pub fn fixed_width(dt: &DataType) -> i32 {
DataType::Int32 | DataType::Date32 | DataType::Time32(_) |
DataType::Float32 => 4,
DataType::Int64 | DataType::Float64 => 8,
DataType::Decimal128(p, _) if *p <= 18 => 8,
- DataType::Timestamp(_, _) => 8,
+ DataType::Timestamp(TimeUnit::Millisecond | TimeUnit::Microsecond, _)
=> 8,
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => 12,
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 12,
_ => -1,
}
}
+pub fn is_timestamp_nanos(dt: &DataType) -> bool {
+ matches!(dt, DataType::Timestamp(TimeUnit::Nanosecond, _))
+ || matches!(dt, DataType::Struct(fields) if
is_timestamp_nanos_struct(fields))
+}
+
pub fn is_timestamp_nanos_struct(fields: &Fields) -> bool {
fields.len() == 2
&& fields[0].name() == "millis"
@@ -40,6 +50,34 @@ pub fn is_timestamp_nanos_struct(fields: &Fields) -> bool {
&& *fields[1].data_type() == DataType::Int32
}
+pub fn is_valid_nanos_of_milli(nanos: i32) -> bool {
+ (0..NANOS_PER_MILLI as i32).contains(&nanos)
+}
+
+pub fn ns_to_millis_nanos(ns: i64) -> (i64, i32) {
+ (
+ ns.div_euclid(NANOS_PER_MILLI),
+ ns.rem_euclid(NANOS_PER_MILLI) as i32,
+ )
+}
+
+pub fn millis_nanos_to_ns(millis: i64, nanos: i32) -> io::Result<i64> {
+ if !is_valid_nanos_of_milli(nanos) {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid nanos_of_milli: {}", nanos),
+ ));
+ }
+ let ns = millis as i128 * NANOS_PER_MILLI as i128 + nanos as i128;
+ if ns < i64::MIN as i128 || ns > i64::MAX as i128 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "timestamp ns overflow",
+ ));
+ }
+ Ok(ns as i64)
+}
+
pub fn validate_data_type(dt: &DataType) -> Result<(), String> {
match dt {
DataType::Boolean
@@ -61,7 +99,7 @@ pub fn validate_data_type(dt: &DataType) -> Result<(),
String> {
}
}
DataType::Timestamp(unit, _) => match unit {
- TimeUnit::Millisecond | TimeUnit::Microsecond => Ok(()),
+ TimeUnit::Millisecond | TimeUnit::Microsecond |
TimeUnit::Nanosecond => Ok(()),
_ => Err(format!("unsupported Timestamp unit: {:?}", unit)),
},
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) =>
Ok(()),
@@ -95,6 +133,7 @@ pub fn precision_of(dt: &DataType) -> u32 {
DataType::Decimal128(p, _) => *p as u32,
DataType::Timestamp(TimeUnit::Millisecond, _) => 3,
DataType::Timestamp(TimeUnit::Microsecond, _) => 6,
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => 9,
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 9,
DataType::Time32(TimeUnit::Millisecond) => 3,
_ => 0,
@@ -125,6 +164,7 @@ pub fn serialize_field(field: &Field, buf: &mut Vec<u8>) {
let p = match unit {
TimeUnit::Millisecond => 3u32,
TimeUnit::Microsecond => 6u32,
+ TimeUnit::Nanosecond => 9u32,
_ => 0,
};
varint::encode(buf, p);
@@ -190,13 +230,7 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut
usize) -> Result<Fiel
} else if precision <= 6 {
DataType::Timestamp(TimeUnit::Microsecond, None)
} else {
- DataType::Struct(
- vec![
- Field::new("millis", DataType::Int64, false),
- Field::new("nanos_of_milli", DataType::Int32, false),
- ]
- .into(),
- )
+ DataType::Timestamp(TimeUnit::Nanosecond, None)
}
}
17 => {
@@ -221,13 +255,7 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut
usize) -> Result<Fiel
} else if precision <= 6 {
DataType::Timestamp(TimeUnit::Microsecond, Some(tz))
} else {
- DataType::Struct(
- vec![
- Field::new("millis", DataType::Int64, false),
- Field::new("nanos_of_milli", DataType::Int32, false),
- ]
- .into(),
- )
+ DataType::Timestamp(TimeUnit::Nanosecond, Some(tz))
}
}
_ => {
diff --git a/core/tests/robustness_test.rs b/core/tests/robustness_test.rs
index 7e2e15c..08e5a27 100644
--- a/core/tests/robustness_test.rs
+++ b/core/tests/robustness_test.rs
@@ -1653,7 +1653,7 @@ fn test_decimal128_extreme_values() {
}
}
-// ======================== TimestampNanos (Struct) Tests
========================
+// ======================== TimestampNanos Tests ========================
#[test]
fn test_timestamp_nanos_basic_roundtrip() {
@@ -1707,19 +1707,22 @@ fn test_timestamp_nanos_basic_roundtrip() {
let col = result[0]
.column(0)
.as_any()
- .downcast_ref::<StructArray>()
+ .downcast_ref::<TimestampNanosecondArray>()
.unwrap();
- let millis_col =
col.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
- let nanos_col =
col.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
-
for i in 0..num_rows {
if i % 11 == 0 {
assert!(col.is_null(i), "expected null at row {}", i);
} else {
assert!(!col.is_null(i));
- assert_eq!(millis_col.value(i), 1_700_000_000_000i64 + i as i64);
- assert_eq!(nanos_col.value(i), (i % 1_000_000) as i32);
+ assert_eq!(
+ col.value(i),
+ mosaic_core::types::millis_nanos_to_ns(
+ 1_700_000_000_000i64 + i as i64,
+ (i % 1_000_000) as i32,
+ )
+ .unwrap()
+ );
}
}
}
@@ -1738,20 +1741,19 @@ fn test_timestamp_nanos_extreme_values() {
false,
)]);
- // Extreme millisecond values and nanos boundary values
- let millis_data: Vec<i64> = vec![
+ let ns_data: Vec<i64> = vec![
0,
i64::MAX,
i64::MIN,
- 1_700_000_000_000,
- -1_700_000_000_000,
+ mosaic_core::types::millis_nanos_to_ns(1_700_000_000_000,
999_999).unwrap(),
+ mosaic_core::types::millis_nanos_to_ns(-1_700_000_000_000, 1).unwrap(),
1,
-1,
];
- let nanos_data: Vec<i32> = vec![
- 0, 999_999, // max valid nanos_of_milli
- 0, 500_000, 999_999, 1, 0,
- ];
+ let (millis_data, nanos_data): (Vec<i64>, Vec<i32>) = ns_data
+ .iter()
+ .map(|&ns| mosaic_core::types::ns_to_millis_nanos(ns))
+ .unzip();
let millis_arr = Int64Array::from(millis_data.clone());
let nanos_arr = Int32Array::from(nanos_data.clone());
@@ -1768,20 +1770,11 @@ fn test_timestamp_nanos_extreme_values() {
let col = result[0]
.column(0)
.as_any()
- .downcast_ref::<StructArray>()
+ .downcast_ref::<TimestampNanosecondArray>()
.unwrap();
- let millis_col =
col.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
- let nanos_col =
col.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
-
- for i in 0..millis_data.len() {
- assert_eq!(
- millis_col.value(i),
- millis_data[i],
- "millis mismatch at {}",
- i
- );
- assert_eq!(nanos_col.value(i), nanos_data[i], "nanos mismatch at {}",
i);
+ for (i, expected) in ns_data.iter().enumerate() {
+ assert_eq!(col.value(i), *expected, "timestamp ns mismatch at {}", i);
}
}
diff --git a/core/tests/stress_test.rs b/core/tests/stress_test.rs
index 69e3204..37d0289 100644
--- a/core/tests/stress_test.rs
+++ b/core/tests/stress_test.rs
@@ -27,7 +27,7 @@ use std::io;
use std::sync::Arc;
use arrow_array::*;
-use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
use mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
use mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
@@ -289,10 +289,6 @@ fn test_wide_table_200_columns() {
#[test]
fn test_all_types_at_scale() {
let num_rows = 500_000;
- let ts_nanos_fields = Fields::from(vec![
- Field::new("millis", DataType::Int64, false),
- Field::new("nanos_of_milli", DataType::Int32, false),
- ]);
let schema = Schema::new(vec![
Field::new("bool_col", DataType::Boolean, true),
Field::new("i8_col", DataType::Int8, true),
@@ -316,7 +312,11 @@ fn test_all_types_at_scale() {
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
- Field::new("ts_ns_col", DataType::Struct(ts_nanos_fields.clone()),
true),
+ Field::new(
+ "ts_ns_col",
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ true,
+ ),
]);
let batch_size = 50_000;
@@ -476,35 +476,18 @@ fn test_all_types_at_scale() {
})
.collect();
- // Timestamp nanos as struct
- let millis_vals: Vec<Option<i64>> = (0..count)
- .map(|i| {
- if (batch_start + i) % 21 == 0 {
- None
- } else {
- Some(1_700_000_000_000i64 + (batch_start + i) as i64)
- }
- })
- .collect();
- let nanos_vals: Vec<Option<i32>> = (0..count)
+ let ts_ns: Vec<Option<i64>> = (0..count)
.map(|i| {
if (batch_start + i) % 21 == 0 {
None
} else {
- Some(((batch_start + i) % 1_000_000) as i32)
+ let millis = 1_700_000_000_000i64 + (batch_start + i) as
i64;
+ let nanos = ((batch_start + i) % 1_000_000) as i32;
+ Some(mosaic_core::types::millis_nanos_to_ns(millis,
nanos).unwrap())
}
})
.collect();
- let millis_arr = Int64Array::from(millis_vals);
- let nanos_arr = Int32Array::from(nanos_vals);
- let null_buf = millis_arr.nulls().cloned();
- let ts_ns_struct = StructArray::new(
- ts_nanos_fields.clone(),
- vec![Arc::new(millis_arr), Arc::new(nanos_arr)],
- null_buf,
- );
-
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
@@ -526,7 +509,7 @@ fn test_all_types_at_scale() {
),
Arc::new(TimestampMillisecondArray::from(ts_ms)),
Arc::new(TimestampMicrosecondArray::from(ts_us)),
- Arc::new(ts_ns_struct),
+ Arc::new(TimestampNanosecondArray::from(ts_ns)),
],
)
.unwrap();
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 37e8c07..1d08012 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -246,6 +246,52 @@ static void test_all_types() {
printf(" PASS test_all_types\n");
}
+static void test_timestamp_ns_roundtrip() {
+ auto ts_ns_type = arrow::timestamp(arrow::TimeUnit::NANO);
+ auto ts_ns_tz_type = arrow::timestamp(arrow::TimeUnit::NANO,
"Asia/Shanghai");
+ auto schema = arrow::schema({
+ arrow::field("ts_ns", ts_ns_type),
+ arrow::field("ts_ns_tz", ts_ns_tz_type),
+ });
+
+ const int64_t values[] = {1700000000000000123LL, -1LL};
+
+ arrow::TimestampBuilder ts_ns_b(ts_ns_type, arrow::default_memory_pool());
+ assert(ts_ns_b.Append(values[0]).ok());
+ assert(ts_ns_b.AppendNull().ok());
+ assert(ts_ns_b.Append(values[1]).ok());
+
+ arrow::TimestampBuilder ts_ns_tz_b(ts_ns_tz_type,
arrow::default_memory_pool());
+ assert(ts_ns_tz_b.Append(values[0]).ok());
+ assert(ts_ns_tz_b.AppendNull().ok());
+ assert(ts_ns_tz_b.Append(values[1]).ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 3, {
+ ts_ns_b.Finish().ValueUnsafe(),
+ ts_ns_tz_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+
+ ASSERT_TRUE(rb->schema()->field(0)->type()->Equals(ts_ns_type));
+ ASSERT_TRUE(rb->schema()->field(1)->type()->Equals(ts_ns_tz_type));
+
+ auto ts_ns =
std::static_pointer_cast<arrow::TimestampArray>(rb->column(0));
+ auto ts_ns_tz =
std::static_pointer_cast<arrow::TimestampArray>(rb->column(1));
+ ASSERT_EQ(ts_ns->Value(0), values[0]);
+ ASSERT_TRUE(ts_ns->IsNull(1));
+ ASSERT_EQ(ts_ns->Value(2), values[1]);
+ ASSERT_EQ(ts_ns_tz->Value(0), values[0]);
+ ASSERT_TRUE(ts_ns_tz->IsNull(1));
+ ASSERT_EQ(ts_ns_tz->Value(2), values[1]);
+ printf(" PASS test_timestamp_ns_roundtrip\n");
+}
+
static void test_projection() {
auto schema = arrow::schema({
arrow::field("a", arrow::int32()),
@@ -783,6 +829,7 @@ int main() {
test_basic_roundtrip();
test_null_values();
test_all_types();
+ test_timestamp_ns_roundtrip();
test_projection();
test_projection_empty();
test_statistics();
@@ -794,6 +841,6 @@ int main() {
test_writer_stats_all_null();
test_writer_stats_matches_reader();
test_stats_empty_string_min();
- printf("All %d tests passed.\n", 14);
+ printf("All %d tests passed.\n", 15);
return 0;
}
diff --git a/docs/design.html b/docs/design.html
index b534836..af052e8 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -766,6 +766,15 @@ repeated numColumns times:
<tr><td>BINARY / VARBINARY / BYTES</td><td>varint length +
raw bytes</td></tr>
</tbody>
</table>
+ <p>
+ Readers expose TIMESTAMP precision > 6 as Arrow
Timestamp(Nanosecond, timezone).
+ This keeps the 12-byte physical encoding unchanged, but
precisions 7 and 8 are
+ normalized to Arrow nanosecond units in the external schema
because Arrow timestamp
+ units do not preserve decimal timestamp precision separately.
+ Existing 12-byte timestamp values outside Arrow's signed
64-bit epoch-nanosecond
+ range cannot be represented through this Arrow API; readers
return InvalidData for
+ those values, and legacy Struct timestamp writes reject them
before writing.
+ </p>
<!-- ============================================================
-->
<h2>Varint Encoding</h2>
diff --git a/docs/java-api.html b/docs/java-api.html
index 0050ad0..d6cd7cb 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -108,8 +108,14 @@
<span class="ty">Field</span>.nullable(<span class="str">"name"</span>,
<span class="ty">ArrowType.Utf8</span>.INSTANCE),
<span class="ty">Field</span>.nullable(<span class="str">"score"</span>,
<span class="kw">new</span> <span
class="ty">ArrowType.FloatingPoint</span>(FloatingPointPrecision.DOUBLE)),
<span class="ty">Field</span>.nullable(<span class="str">"amount"</span>,
<span class="kw">new</span> <span class="ty">ArrowType.Decimal</span>(<span
class="num">10</span>, <span class="num">2</span>, <span
class="num">128</span>)),
- <span class="ty">Field</span>.nullable(<span class="str">"ts"</span>,
<span class="kw">new</span> <span
class="ty">ArrowType.Timestamp</span>(TimeUnit.MILLISECOND, <span
class="kw">null</span>))
+ <span class="ty">Field</span>.nullable(<span class="str">"ts"</span>,
<span class="kw">new</span> <span
class="ty">ArrowType.Timestamp</span>(TimeUnit.MILLISECOND, <span
class="kw">null</span>)),
+ <span class="ty">Field</span>.nullable(<span class="str">"ts_ns"</span>,
<span class="kw">new</span> <span
class="ty">ArrowType.Timestamp</span>(TimeUnit.NANOSECOND, <span
class="str">"Asia/Shanghai"</span>))
));</code></pre>
+ <p>
+ Timestamp fields support Arrow millisecond, microsecond, and
nanosecond units.
+ Nanosecond timestamps are exposed as standard Arrow
<code>TimeUnit.NANOSECOND</code>
+ vectors while Mosaic keeps its 12-byte physical encoding
internally.
+ </p>
<h3>2. Create Writer and Write Batches</h3>
<p>
diff --git a/docs/python-api.html b/docs/python-api.html
index 168d362..d1cd442 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -88,7 +88,13 @@ pa_schema = pa.schema([
pa.field(<span class="str">"score"</span>, pa.float64()),
pa.field(<span class="str">"amount"</span>, pa.decimal128(<span
class="num">10</span>, <span class="num">2</span>)),
pa.field(<span class="str">"ts"</span>, pa.timestamp(<span
class="str">"ms"</span>)),
+ pa.field(<span class="str">"ts_ns"</span>, pa.timestamp(<span
class="str">"ns"</span>, tz=<span class="str">"Asia/Shanghai"</span>)),
])</code></pre>
+ <p>
+ Timestamp fields support Arrow millisecond, microsecond, and
nanosecond units.
+ Nanosecond timestamps are exposed as standard Arrow
<code>timestamp("ns")</code>
+ arrays while Mosaic keeps its 12-byte physical encoding
internally.
+ </p>
<h3>2. Create Writer and Write Batches</h3>
<p>
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 0fc3f88..5344a2d 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -33,11 +33,14 @@ import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -484,6 +487,52 @@ public class MosaicRoundtripTest {
}
}
+ @Test
+ public void testTimestampNsRoundtrip() {
+ ArrowType.Timestamp tsNsType = new
ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
+ ArrowType.Timestamp tsNsTzType = new
ArrowType.Timestamp(TimeUnit.NANOSECOND, "Asia/Shanghai");
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("ts_ns", tsNsType),
+ Field.nullable("ts_ns_tz", tsNsTzType)
+ ));
+
+ long[] values = {1700000000000000123L, -1L};
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ TimeStampNanoVector tsNsVec = (TimeStampNanoVector)
root.getVector("ts_ns");
+ TimeStampNanoTZVector tsNsTzVec = (TimeStampNanoTZVector)
root.getVector("ts_ns_tz");
+ int n = 3;
+ tsNsVec.allocateNew(n);
+ tsNsTzVec.allocateNew(n);
+
+ tsNsVec.set(0, values[0]);
+ tsNsVec.setNull(1);
+ tsNsVec.set(2, values[1]);
+ tsNsTzVec.set(0, values[0]);
+ tsNsTzVec.setNull(1);
+ tsNsTzVec.set(2, values[1]);
+
+ root.setRowCount(n);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ assertEquals(tsNsType,
reader.getSchema().findField("ts_ns").getType());
+ assertEquals(tsNsTzType,
reader.getSchema().findField("ts_ns_tz").getType());
+ try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+ TimeStampNanoVector tsNs = (TimeStampNanoVector)
batch.getVector("ts_ns");
+ TimeStampNanoTZVector tsNsTz = (TimeStampNanoTZVector)
batch.getVector("ts_ns_tz");
+
+ assertEquals(values[0], tsNs.get(0));
+ assertTrue(tsNs.isNull(1));
+ assertEquals(values[1], tsNs.get(2));
+ assertEquals(values[0], tsNsTz.get(0));
+ assertTrue(tsNsTz.isNull(1));
+ assertEquals(values[1], tsNsTz.get(2));
+ }
+ }
+ }
+
@Test
public void testCompressionNone() {
Schema arrowSchema = new Schema(Arrays.asList(
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
index 7816413..625369b 100644
--- a/python/mosaic/_ffi.py
+++ b/python/mosaic/_ffi.py
@@ -49,15 +49,15 @@ def _load_library():
else:
lib_name = "libmosaic_ffi.so"
+ env_path = os.environ.get("MOSAIC_LIB_PATH")
search_paths = []
+ if env_path:
+ search_paths.append(env_path)
+
pkg_dir = os.path.dirname(os.path.abspath(__file__))
search_paths.append(pkg_dir)
search_paths.append(os.path.join(pkg_dir, "..", ".."))
- env_path = os.environ.get("MOSAIC_LIB_PATH")
- if env_path:
- search_paths.append(env_path)
-
for rel in [
os.path.join("..", "target", "release"),
os.path.join("..", "target", "debug"),
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 50a5fee..b60f0f1 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -176,9 +176,12 @@ class TestRoundtrip:
pa.field("f_decimal", pa.decimal128(10, 2)),
pa.field("f_date", pa.date32()),
pa.field("f_timestamp", pa.timestamp("ms")),
+ pa.field("f_timestamp_ns", pa.timestamp("ns")),
+ pa.field("f_timestamp_ns_tz", pa.timestamp("ns",
tz="Asia/Shanghai")),
]
)
+ ts_ns_values = [1700000000000000123, -1]
batch = pa.record_batch(
[
pa.array([True, False]),
@@ -193,11 +196,14 @@ class TestRoundtrip:
pa.array([1234567, -9876543], type=pa.decimal128(10, 2)),
pa.array([19000, 0], type=pa.date32()),
pa.array([1700000000000, 0], type=pa.timestamp("ms")),
+ pa.array(ts_ns_values, type=pa.timestamp("ns")),
+ pa.array(ts_ns_values, type=pa.timestamp("ns",
tz="Asia/Shanghai")),
],
names=[
"f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
"f_float32", "f_float64", "f_utf8", "f_binary",
"f_decimal", "f_date", "f_timestamp",
+ "f_timestamp_ns", "f_timestamp_ns_tz",
],
)
@@ -223,6 +229,39 @@ class TestRoundtrip:
assert abs(f64[0] - 2.718281828) < 1e-9
assert abs(f64[1] - (-3.141592653)) < 1e-9
+ assert rb.schema.field("f_timestamp_ns").type == pa.timestamp("ns")
+ assert rb.column("f_timestamp_ns").cast(pa.int64()).to_pylist() ==
ts_ns_values
+ assert rb.schema.field("f_timestamp_ns_tz").type == pa.timestamp(
+ "ns", tz="Asia/Shanghai"
+ )
+ assert rb.column("f_timestamp_ns_tz").cast(pa.int64()).to_pylist()
== ts_ns_values
+
+ def test_timestamp_nanos_mixed_batches_updates_dictionary(self):
+ pa_schema = pa.schema([pa.field("ts", pa.timestamp("ns"))])
+ first_values = [1, None, 2]
+ second_values = [3 + (i % 3) for i in range(120)]
+
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema) as writer:
+ writer.write(
+ pa.record_batch(
+ [pa.array(first_values, type=pa.timestamp("ns"))],
+ names=["ts"],
+ )
+ )
+ writer.write(
+ pa.record_batch(
+ [pa.array(second_values, type=pa.timestamp("ns"))],
+ names=["ts"],
+ )
+ )
+
+ with _reader_from_bytes(buf.getvalue()) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.column("ts").cast(pa.int64()).to_pylist() == (
+ first_values + second_values
+ )
+
def test_multiple_row_groups(self):
pa_schema = pa.schema(
[pa.field("id", pa.int32()), pa.field("data", pa.int64())]