This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1bc399b test: add it for produce & scan log for all supported
datatypes (#205)
1bc399b is described below
commit 1bc399b8ebac2810e4a1d3d1a54f1679e1dac65a
Author: SkylerLin <[email protected]>
AuthorDate: Mon Jan 26 12:12:10 2026 +0800
test: add it for produce & scan log for all supported datatypes (#205)
---
crates/fluss/src/row/column.rs | 164 ++++++++++----
crates/fluss/tests/integration/table.rs | 391 ++++++++++++++++++++++++++++++++
2 files changed, 507 insertions(+), 48 deletions(-)
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 46c25b2..f48075b 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -17,8 +17,11 @@
use crate::row::InternalRow;
use arrow::array::{
- Array, AsArray, BinaryArray, Decimal128Array, FixedSizeBinaryArray,
Float32Array, Float64Array,
- Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray,
+ Array, AsArray, BinaryArray, Date32Array, Decimal128Array,
FixedSizeBinaryArray, Float32Array,
+ Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch,
StringArray,
+ Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray,
+ TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray,
+ TimestampSecondArray,
};
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use std::sync::Arc;
@@ -67,35 +70,110 @@ impl ColumnarRow {
) -> T {
let schema = self.record_batch.schema();
let arrow_field = schema.field(pos);
- let value = self.get_long(pos);
+ let column = self.record_batch.column(pos);
- match arrow_field.data_type() {
- ArrowDataType::Timestamp(time_unit, _) => {
- // Convert based on Arrow TimeUnit
- let (millis, nanos) = match time_unit {
- TimeUnit::Second => (value * 1000, 0),
- TimeUnit::Millisecond => (value, 0),
- TimeUnit::Microsecond => {
- let millis = value / 1000;
- let nanos = ((value % 1000) * 1000) as i32;
- (millis, nanos)
- }
- TimeUnit::Nanosecond => {
- let millis = value / 1_000_000;
- let nanos = (value % 1_000_000) as i32;
- (millis, nanos)
- }
- };
-
- if nanos == 0 {
- construct_compact(millis)
- } else {
- // nanos is guaranteed to be in valid range [0, 999_999]
by arithmetic
- construct_with_nanos(millis, nanos)
- .expect("nanos in valid range by construction")
+ // Read value based on the actual Arrow timestamp type
+ let value = match arrow_field.data_type() {
+ ArrowDataType::Timestamp(TimeUnit::Second, _) => column
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .expect("Expected TimestampSecondArray")
+ .value(self.row_id),
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("Expected TimestampMillisecondArray")
+ .value(self.row_id),
+ ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .expect("Expected TimestampMicrosecondArray")
+ .value(self.row_id),
+ ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("Expected TimestampNanosecondArray")
+ .value(self.row_id),
+ other => panic!("Expected Timestamp column at position {pos}, got
{other:?}"),
+ };
+
+ // Convert based on Arrow TimeUnit
+ let (millis, nanos) = match arrow_field.data_type() {
+ ArrowDataType::Timestamp(time_unit, _) => match time_unit {
+ TimeUnit::Second => (value * 1000, 0),
+ TimeUnit::Millisecond => (value, 0),
+ TimeUnit::Microsecond => {
+ // Use Euclidean division so that nanos is always
non-negative,
+ // even for timestamps before the Unix epoch.
+ let millis = value.div_euclid(1000);
+ let nanos = (value.rem_euclid(1000) * 1000) as i32;
+ (millis, nanos)
+ }
+ TimeUnit::Nanosecond => {
+ // Use Euclidean division so that nanos is always in [0,
999_999].
+ let millis = value.div_euclid(1_000_000);
+ let nanos = value.rem_euclid(1_000_000) as i32;
+ (millis, nanos)
}
+ },
+ _ => unreachable!(),
+ };
+
+ if nanos == 0 {
+ construct_compact(millis)
+ } else {
+ // nanos is guaranteed to be in valid range [0, 999_999] by
arithmetic
+ construct_with_nanos(millis, nanos).expect("nanos in valid range
by construction")
+ }
+ }
+
+ /// Read date value from Arrow Date32Array
+ fn read_date_from_arrow(&self, pos: usize) -> i32 {
+ self.record_batch
+ .column(pos)
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .expect("Expected Date32Array")
+ .value(self.row_id)
+ }
+
+ /// Read time value from Arrow Time32/Time64 arrays, converting to
milliseconds
+ fn read_time_from_arrow(&self, pos: usize) -> i32 {
+ let schema = self.record_batch.schema();
+ let arrow_field = schema.field(pos);
+ let column = self.record_batch.column(pos);
+
+ match arrow_field.data_type() {
+ ArrowDataType::Time32(TimeUnit::Second) => {
+ let value = column
+ .as_any()
+ .downcast_ref::<Time32SecondArray>()
+ .expect("Expected Time32SecondArray")
+ .value(self.row_id);
+ value * 1000 // Convert seconds to milliseconds
}
- other => panic!("Expected Timestamp column at position {pos}, got
{other:?}"),
+ ArrowDataType::Time32(TimeUnit::Millisecond) => column
+ .as_any()
+ .downcast_ref::<Time32MillisecondArray>()
+ .expect("Expected Time32MillisecondArray")
+ .value(self.row_id),
+ ArrowDataType::Time64(TimeUnit::Microsecond) => {
+ let value = column
+ .as_any()
+ .downcast_ref::<Time64MicrosecondArray>()
+ .expect("Expected Time64MicrosecondArray")
+ .value(self.row_id);
+ (value / 1000) as i32 // Convert microseconds to milliseconds
+ }
+ ArrowDataType::Time64(TimeUnit::Nanosecond) => {
+ let value = column
+ .as_any()
+ .downcast_ref::<Time64NanosecondArray>()
+ .expect("Expected Time64NanosecondArray")
+ .value(self.row_id);
+ (value / 1_000_000) as i32 // Convert nanoseconds to
milliseconds
+ }
+ other => panic!("Expected Time column at position {pos}, got
{other:?}"),
}
}
}
@@ -220,11 +298,11 @@ impl InternalRow for ColumnarRow {
}
fn get_date(&self, pos: usize) -> crate::row::datum::Date {
- crate::row::datum::Date::new(self.get_int(pos))
+ crate::row::datum::Date::new(self.read_date_from_arrow(pos))
}
fn get_time(&self, pos: usize) -> crate::row::datum::Time {
- crate::row::datum::Time::new(self.get_int(pos))
+ crate::row::datum::Time::new(self.read_time_from_arrow(pos))
}
fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
crate::row::datum::TimestampNtz {
@@ -250,16 +328,12 @@ impl InternalRow for ColumnarRow {
}
fn get_char(&self, pos: usize, _length: usize) -> &str {
- let array = self
- .record_batch
+ self.record_batch
.column(pos)
.as_any()
- .downcast_ref::<FixedSizeBinaryArray>()
- .expect("Expected fixed-size binary array for char type");
-
- let bytes = array.value(self.row_id);
- // don't check length, following java client
- std::str::from_utf8(bytes).expect("Invalid UTF-8 in char field")
+ .downcast_ref::<StringArray>()
+ .expect("Expected String array for char type")
+ .value(self.row_id)
}
fn get_string(&self, pos: usize) -> &str {
@@ -294,8 +368,8 @@ impl InternalRow for ColumnarRow {
mod tests {
use super::*;
use arrow::array::{
- BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array,
Float64Array, Int8Array,
- Int16Array, Int32Array, Int64Array, StringArray,
+ BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array,
Int16Array, Int32Array,
+ Int64Array, StringArray,
};
use arrow::datatypes::{DataType, Field, Schema};
@@ -311,7 +385,7 @@ mod tests {
Field::new("f64", DataType::Float64, false),
Field::new("s", DataType::Utf8, false),
Field::new("bin", DataType::Binary, false),
- Field::new("char", DataType::FixedSizeBinary(2), false),
+ Field::new("char", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
@@ -326,13 +400,7 @@ mod tests {
Arc::new(Float64Array::from(vec![2.5])),
Arc::new(StringArray::from(vec!["hello"])),
Arc::new(BinaryArray::from(vec![b"data".as_slice()])),
- Arc::new(
- FixedSizeBinaryArray::try_from_sparse_iter_with_size(
- vec![Some(b"ab".as_slice())].into_iter(),
- 2,
- )
- .expect("fixed array"),
- ),
+ Arc::new(StringArray::from(vec!["ab"])),
],
)
.expect("record batch");
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index 046ec02..6a15674 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -576,4 +576,395 @@ mod table_test {
// Projected batch should have 1 column (id), not 2 (id, name)
assert_eq!(proj_batches[0].batch().num_columns(), 1);
}
+
+ /// Integration test covering produce and scan operations for all
supported datatypes
+ /// in log tables.
+ #[tokio::test]
+ async fn all_supported_datatypes() {
+ use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz,
TimestampNtz};
+
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_log_all_datatypes".to_string());
+
+ // Create a log table with all supported datatypes for append/scan
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ // Integer types
+ .column("col_tinyint", DataTypes::tinyint())
+ .column("col_smallint", DataTypes::smallint())
+ .column("col_int", DataTypes::int())
+ .column("col_bigint", DataTypes::bigint())
+ // Floating point types
+ .column("col_float", DataTypes::float())
+ .column("col_double", DataTypes::double())
+ // Boolean type
+ .column("col_boolean", DataTypes::boolean())
+ // Char type
+ .column("col_char", DataTypes::char(10))
+ // String type
+ .column("col_string", DataTypes::string())
+ // Decimal type
+ .column("col_decimal", DataTypes::decimal(10, 2))
+ // Date type
+ .column("col_date", DataTypes::date())
+ // Time types
+ .column("col_time_s", DataTypes::time_with_precision(0))
+ .column("col_time_ms", DataTypes::time_with_precision(3))
+ .column("col_time_us", DataTypes::time_with_precision(6))
+ .column("col_time_ns", DataTypes::time_with_precision(9))
+ // Timestamp types
+ .column("col_timestamp_s",
DataTypes::timestamp_with_precision(0))
+ .column("col_timestamp_ms",
DataTypes::timestamp_with_precision(3))
+ .column("col_timestamp_us",
DataTypes::timestamp_with_precision(6))
+ .column("col_timestamp_ns",
DataTypes::timestamp_with_precision(9))
+ // Timestamp_ltz types
+ .column(
+ "col_timestamp_ltz_s",
+ DataTypes::timestamp_ltz_with_precision(0),
+ )
+ .column(
+ "col_timestamp_ltz_ms",
+ DataTypes::timestamp_ltz_with_precision(3),
+ )
+ .column(
+ "col_timestamp_ltz_us",
+ DataTypes::timestamp_ltz_with_precision(6),
+ )
+ .column(
+ "col_timestamp_ltz_ns",
+ DataTypes::timestamp_ltz_with_precision(9),
+ )
+ // Bytes type
+ .column("col_bytes", DataTypes::bytes())
+ // Timestamp types with negative values (before Unix epoch)
+ .column(
+ "col_timestamp_us_neg",
+ DataTypes::timestamp_with_precision(6),
+ )
+ .column(
+ "col_timestamp_ns_neg",
+ DataTypes::timestamp_with_precision(9),
+ )
+ .column(
+ "col_timestamp_ltz_us_neg",
+ DataTypes::timestamp_ltz_with_precision(6),
+ )
+ .column(
+ "col_timestamp_ltz_ns_neg",
+ DataTypes::timestamp_ltz_with_precision(9),
+ )
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let field_count = table.table_info().schema.columns().len();
+
+ let append_writer = table
+ .new_append()
+ .expect("Failed to create append")
+ .create_writer();
+
+ // Test data for all datatypes
+ let col_tinyint = 127i8;
+ let col_smallint = 32767i16;
+ let col_int = 2147483647i32;
+ let col_bigint = 9223372036854775807i64;
+ let col_float = 3.14f32;
+ let col_double = 2.718281828459045f64;
+ let col_boolean = true;
+ let col_char = "hello";
+ let col_string = "world of fluss rust client";
+ let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
// 123.45
+ let col_date = Date::new(20476); // 2026-01-23
+ let col_time_s = Time::new(36827000); // 10:13:47
+ let col_time_ms = Time::new(36827123); // 10:13:47.123
+ let col_time_us = Time::new(86399999); // 23:59:59.999
+ let col_time_ns = Time::new(1); // 00:00:00.001
+ // 2026-01-23 10:13:47 UTC
+ let col_timestamp_s = TimestampNtz::new(1769163227000);
+ // 2026-01-23 10:13:47.123 UTC
+ let col_timestamp_ms = TimestampNtz::new(1769163227123);
+ // 2026-01-23 10:13:47.123456 UTC
+ let col_timestamp_us = TimestampNtz::from_millis_nanos(1769163227123,
456000).unwrap();
+ // 2026-01-23 10:13:47.123999999 UTC
+ let col_timestamp_ns = TimestampNtz::from_millis_nanos(1769163227123,
999_999).unwrap();
+ let col_timestamp_ltz_s = TimestampLtz::new(1769163227000);
+ let col_timestamp_ltz_ms = TimestampLtz::new(1769163227123);
+ let col_timestamp_ltz_us =
TimestampLtz::from_millis_nanos(1769163227123, 456000).unwrap();
+ let col_timestamp_ltz_ns =
TimestampLtz::from_millis_nanos(1769163227123, 999_999).unwrap();
+ let col_bytes: Vec<u8> = b"binary data".to_vec();
+
+ // 1960-06-15 08:30:45.123456 UTC (before 1970)
+ let col_timestamp_us_neg =
TimestampNtz::from_millis_nanos(-301234154877, 456000).unwrap();
+ // 1960-06-15 08:30:45.123999999 UTC (before 1970)
+ let col_timestamp_ns_neg =
TimestampNtz::from_millis_nanos(-301234154877, 999_999).unwrap();
+ let col_timestamp_ltz_us_neg =
+ TimestampLtz::from_millis_nanos(-301234154877, 456000).unwrap();
+ let col_timestamp_ltz_ns_neg =
+ TimestampLtz::from_millis_nanos(-301234154877, 999_999).unwrap();
+
+ // Append a row with all datatypes
+ let mut row = GenericRow::new(field_count);
+ row.set_field(0, col_tinyint);
+ row.set_field(1, col_smallint);
+ row.set_field(2, col_int);
+ row.set_field(3, col_bigint);
+ row.set_field(4, col_float);
+ row.set_field(5, col_double);
+ row.set_field(6, col_boolean);
+ row.set_field(7, col_char);
+ row.set_field(8, col_string);
+ row.set_field(9, col_decimal.clone());
+ row.set_field(10, col_date);
+ row.set_field(11, col_time_s);
+ row.set_field(12, col_time_ms);
+ row.set_field(13, col_time_us);
+ row.set_field(14, col_time_ns);
+ row.set_field(15, col_timestamp_s);
+ row.set_field(16, col_timestamp_ms);
+ row.set_field(17, col_timestamp_us.clone());
+ row.set_field(18, col_timestamp_ns.clone());
+ row.set_field(19, col_timestamp_ltz_s);
+ row.set_field(20, col_timestamp_ltz_ms);
+ row.set_field(21, col_timestamp_ltz_us.clone());
+ row.set_field(22, col_timestamp_ltz_ns.clone());
+ row.set_field(23, col_bytes.as_slice());
+ row.set_field(24, col_timestamp_us_neg.clone());
+ row.set_field(25, col_timestamp_ns_neg.clone());
+ row.set_field(26, col_timestamp_ltz_us_neg.clone());
+ row.set_field(27, col_timestamp_ltz_ns_neg.clone());
+
+ append_writer
+ .append(row)
+ .await
+ .expect("Failed to append row with all datatypes");
+
+ // Append a row with null values for all columns
+ let mut row_with_nulls = GenericRow::new(field_count);
+ for i in 0..field_count {
+ row_with_nulls.set_field(i, Datum::Null);
+ }
+
+ append_writer
+ .append(row_with_nulls)
+ .await
+ .expect("Failed to append row with nulls");
+
+ append_writer.flush().await.expect("Failed to flush");
+
+ // Scan the records
+ let records = scan_table(&table, |scan| scan).await;
+
+ assert_eq!(records.len(), 2, "Expected 2 records");
+
+ let found_row = records[0].row();
+ assert_eq!(found_row.get_byte(0), col_tinyint, "col_tinyint mismatch");
+ assert_eq!(
+ found_row.get_short(1),
+ col_smallint,
+ "col_smallint mismatch"
+ );
+ assert_eq!(found_row.get_int(2), col_int, "col_int mismatch");
+ assert_eq!(found_row.get_long(3), col_bigint, "col_bigint mismatch");
+ assert!(
+ (found_row.get_float(4) - col_float).abs() < f32::EPSILON,
+ "col_float mismatch: expected {}, got {}",
+ col_float,
+ found_row.get_float(4)
+ );
+ assert!(
+ (found_row.get_double(5) - col_double).abs() < f64::EPSILON,
+ "col_double mismatch: expected {}, got {}",
+ col_double,
+ found_row.get_double(5)
+ );
+ assert_eq!(
+ found_row.get_boolean(6),
+ col_boolean,
+ "col_boolean mismatch"
+ );
+ assert_eq!(found_row.get_char(7, 10), col_char, "col_char mismatch");
+ assert_eq!(found_row.get_string(8), col_string, "col_string mismatch");
+ assert_eq!(
+ found_row.get_decimal(9, 10, 2),
+ col_decimal,
+ "col_decimal mismatch"
+ );
+ assert_eq!(
+ found_row.get_date(10).get_inner(),
+ col_date.get_inner(),
+ "col_date mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_time(11).get_inner(),
+ col_time_s.get_inner(),
+ "col_time_s mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_time(12).get_inner(),
+ col_time_ms.get_inner(),
+ "col_time_ms mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_time(13).get_inner(),
+ col_time_us.get_inner(),
+ "col_time_us mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_time(14).get_inner(),
+ col_time_ns.get_inner(),
+ "col_time_ns mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_timestamp_ntz(15, 0).get_millisecond(),
+ col_timestamp_s.get_millisecond(),
+ "col_timestamp_s mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_timestamp_ntz(16, 3).get_millisecond(),
+ col_timestamp_ms.get_millisecond(),
+ "col_timestamp_ms mismatch"
+ );
+
+ let read_ts_us = found_row.get_timestamp_ntz(17, 6);
+ assert_eq!(
+ read_ts_us.get_millisecond(),
+ col_timestamp_us.get_millisecond(),
+ "col_timestamp_us millis mismatch"
+ );
+ assert_eq!(
+ read_ts_us.get_nano_of_millisecond(),
+ col_timestamp_us.get_nano_of_millisecond(),
+ "col_timestamp_us nanos mismatch"
+ );
+
+ let read_ts_ns = found_row.get_timestamp_ntz(18, 9);
+ assert_eq!(
+ read_ts_ns.get_millisecond(),
+ col_timestamp_ns.get_millisecond(),
+ "col_timestamp_ns millis mismatch"
+ );
+ assert_eq!(
+ read_ts_ns.get_nano_of_millisecond(),
+ col_timestamp_ns.get_nano_of_millisecond(),
+ "col_timestamp_ns nanos mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_timestamp_ltz(19, 0).get_epoch_millisecond(),
+ col_timestamp_ltz_s.get_epoch_millisecond(),
+ "col_timestamp_ltz_s mismatch"
+ );
+
+ assert_eq!(
+ found_row.get_timestamp_ltz(20, 3).get_epoch_millisecond(),
+ col_timestamp_ltz_ms.get_epoch_millisecond(),
+ "col_timestamp_ltz_ms mismatch"
+ );
+
+ let read_ts_ltz_us = found_row.get_timestamp_ltz(21, 6);
+ assert_eq!(
+ read_ts_ltz_us.get_epoch_millisecond(),
+ col_timestamp_ltz_us.get_epoch_millisecond(),
+ "col_timestamp_ltz_us millis mismatch"
+ );
+ assert_eq!(
+ read_ts_ltz_us.get_nano_of_millisecond(),
+ col_timestamp_ltz_us.get_nano_of_millisecond(),
+ "col_timestamp_ltz_us nanos mismatch"
+ );
+
+ let read_ts_ltz_ns = found_row.get_timestamp_ltz(22, 9);
+ assert_eq!(
+ read_ts_ltz_ns.get_epoch_millisecond(),
+ col_timestamp_ltz_ns.get_epoch_millisecond(),
+ "col_timestamp_ltz_ns millis mismatch"
+ );
+ assert_eq!(
+ read_ts_ltz_ns.get_nano_of_millisecond(),
+ col_timestamp_ltz_ns.get_nano_of_millisecond(),
+ "col_timestamp_ltz_ns nanos mismatch"
+ );
+ assert_eq!(found_row.get_bytes(23), col_bytes, "col_bytes mismatch");
+
+ // Verify timestamps before Unix epoch (negative timestamps)
+ let read_ts_us_neg = found_row.get_timestamp_ntz(24, 6);
+ assert_eq!(
+ read_ts_us_neg.get_millisecond(),
+ col_timestamp_us_neg.get_millisecond(),
+ "col_timestamp_us_neg millis mismatch"
+ );
+ assert_eq!(
+ read_ts_us_neg.get_nano_of_millisecond(),
+ col_timestamp_us_neg.get_nano_of_millisecond(),
+ "col_timestamp_us_neg nanos mismatch"
+ );
+
+ let read_ts_ns_neg = found_row.get_timestamp_ntz(25, 9);
+ assert_eq!(
+ read_ts_ns_neg.get_millisecond(),
+ col_timestamp_ns_neg.get_millisecond(),
+ "col_timestamp_ns_neg millis mismatch"
+ );
+ assert_eq!(
+ read_ts_ns_neg.get_nano_of_millisecond(),
+ col_timestamp_ns_neg.get_nano_of_millisecond(),
+ "col_timestamp_ns_neg nanos mismatch"
+ );
+
+ let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(26, 6);
+ assert_eq!(
+ read_ts_ltz_us_neg.get_epoch_millisecond(),
+ col_timestamp_ltz_us_neg.get_epoch_millisecond(),
+ "col_timestamp_ltz_us_neg millis mismatch"
+ );
+ assert_eq!(
+ read_ts_ltz_us_neg.get_nano_of_millisecond(),
+ col_timestamp_ltz_us_neg.get_nano_of_millisecond(),
+ "col_timestamp_ltz_us_neg nanos mismatch"
+ );
+
+ let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(27, 9);
+ assert_eq!(
+ read_ts_ltz_ns_neg.get_epoch_millisecond(),
+ col_timestamp_ltz_ns_neg.get_epoch_millisecond(),
+ "col_timestamp_ltz_ns_neg millis mismatch"
+ );
+ assert_eq!(
+ read_ts_ltz_ns_neg.get_nano_of_millisecond(),
+ col_timestamp_ltz_ns_neg.get_nano_of_millisecond(),
+ "col_timestamp_ltz_ns_neg nanos mismatch"
+ );
+
+ // Verify row with all nulls (record index 1)
+ let found_row_nulls = records[1].row();
+ for i in 0..field_count {
+ assert!(found_row_nulls.is_null_at(i), "column {} should be null",
i);
+ }
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
}