Copilot commented on code in PR #205:
URL: https://github.com/apache/fluss-rust/pull/205#discussion_r2723467007
##########
crates/fluss/src/row/column.rs:
##########
@@ -67,35 +70,107 @@ impl ColumnarRow {
) -> T {
let schema = self.record_batch.schema();
let arrow_field = schema.field(pos);
- let value = self.get_long(pos);
+ let column = self.record_batch.column(pos);
- match arrow_field.data_type() {
- ArrowDataType::Timestamp(time_unit, _) => {
- // Convert based on Arrow TimeUnit
- let (millis, nanos) = match time_unit {
- TimeUnit::Second => (value * 1000, 0),
- TimeUnit::Millisecond => (value, 0),
- TimeUnit::Microsecond => {
- let millis = value / 1000;
- let nanos = ((value % 1000) * 1000) as i32;
- (millis, nanos)
- }
- TimeUnit::Nanosecond => {
- let millis = value / 1_000_000;
- let nanos = (value % 1_000_000) as i32;
- (millis, nanos)
- }
- };
-
- if nanos == 0 {
- construct_compact(millis)
- } else {
- // nanos is guaranteed to be in valid range [0, 999_999]
by arithmetic
- construct_with_nanos(millis, nanos)
- .expect("nanos in valid range by construction")
+ // Read value based on the actual Arrow timestamp type
+ let value = match arrow_field.data_type() {
+ ArrowDataType::Timestamp(TimeUnit::Second, _) => column
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .expect("Expected TimestampSecondArray")
+ .value(self.row_id),
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("Expected TimestampMillisecondArray")
+ .value(self.row_id),
+ ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .expect("Expected TimestampMicrosecondArray")
+ .value(self.row_id),
+ ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("Expected TimestampNanosecondArray")
+ .value(self.row_id),
+ other => panic!("Expected Timestamp column at position {pos}, got
{other:?}"),
+ };
+
+ // Convert based on Arrow TimeUnit
+ let (millis, nanos) = match arrow_field.data_type() {
+ ArrowDataType::Timestamp(time_unit, _) => match time_unit {
+ TimeUnit::Second => (value * 1000, 0),
+ TimeUnit::Millisecond => (value, 0),
+ TimeUnit::Microsecond => {
+ let millis = value / 1000;
+ let nanos = ((value % 1000) * 1000) as i32;
+ (millis, nanos)
}
+ TimeUnit::Nanosecond => {
+ let millis = value / 1_000_000;
+ let nanos = (value % 1_000_000) as i32;
Review Comment:
The conversion from Arrow timestamp values to `(millis, nanos)` for
microsecond and nanosecond units does not handle negative timestamps correctly:
using `/` and `%` on negative `value` can produce a negative `nanos` component,
which will cause
`TimestampNtz::from_millis_nanos`/`TimestampLtz::from_millis_nanos` to panic
even for valid Arrow values. To make this robust for timestamps before the Unix
epoch, consider using `div_euclid`/`rem_euclid` (or an equivalent
normalization) so that `nanos` is always in `[0, 999_999]` and `millis` is
adjusted accordingly.
```suggestion
// Use Euclidean division so that nanos is always
non-negative,
// even for timestamps before the Unix epoch.
let millis = value.div_euclid(1000);
let nanos = (value.rem_euclid(1000) * 1000) as i32;
(millis, nanos)
}
TimeUnit::Nanosecond => {
// Use Euclidean division so that nanos is always in [0,
999_999].
let millis = value.div_euclid(1_000_000);
let nanos = value.rem_euclid(1_000_000) as i32;
```
##########
crates/fluss/tests/integration/table.rs:
##########
@@ -570,4 +570,313 @@ mod table_test {
// Projected batch should have 1 column (id), not 2 (id, name)
assert_eq!(proj_batches[0].num_columns(), 1);
}
+
+ /// Integration test covering produce and scan operations for all
supported datatypes
+ /// in log tables.
+ #[tokio::test]
+ async fn all_supported_datatypes() {
+ use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz,
TimestampNtz};
+
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_log_all_datatypes".to_string());
+
+ // Create a log table with all supported datatypes for append/scan
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ // Integer types
+ .column("col_tinyint", DataTypes::tinyint())
+ .column("col_smallint", DataTypes::smallint())
+ .column("col_int", DataTypes::int())
+ .column("col_bigint", DataTypes::bigint())
+ // Floating point types
+ .column("col_float", DataTypes::float())
+ .column("col_double", DataTypes::double())
+ // Boolean type
+ .column("col_boolean", DataTypes::boolean())
+ // Char type
+ .column("col_char", DataTypes::char(10))
+ // String type
+ .column("col_string", DataTypes::string())
+ // Decimal type
+ .column("col_decimal", DataTypes::decimal(10, 2))
+ // Date type
+ .column("col_date", DataTypes::date())
+ // Time types
+ .column("col_time_s", DataTypes::time_with_precision(0))
+ .column("col_time_ms", DataTypes::time_with_precision(3))
+ .column("col_time_us", DataTypes::time_with_precision(6))
+ .column("col_time_ns", DataTypes::time_with_precision(9))
+ // Timestamp types
+ .column("col_timestamp_s",
DataTypes::timestamp_with_precision(0))
+ .column("col_timestamp_ms",
DataTypes::timestamp_with_precision(3))
+ .column("col_timestamp_us",
DataTypes::timestamp_with_precision(6))
+ .column("col_timestamp_ns",
DataTypes::timestamp_with_precision(9))
+ // Timestamp_ltz types
+ .column(
+ "col_timestamp_ltz_s",
+ DataTypes::timestamp_ltz_with_precision(0),
+ )
+ .column(
+ "col_timestamp_ltz_ms",
+ DataTypes::timestamp_ltz_with_precision(3),
+ )
+ .column(
+ "col_timestamp_ltz_us",
+ DataTypes::timestamp_ltz_with_precision(6),
+ )
+ .column(
+ "col_timestamp_ltz_ns",
+ DataTypes::timestamp_ltz_with_precision(9),
+ )
+ // Bytes type
+ .column("col_bytes", DataTypes::bytes())
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let append_writer = table
+ .new_append()
+ .expect("Failed to create append")
+ .create_writer();
+
+ // Test data for all datatypes
+ let col_tinyint = 127i8;
+ let col_smallint = 32767i16;
+ let col_int = 2147483647i32;
+ let col_bigint = 9223372036854775807i64;
+ let col_float = 3.14f32;
+ let col_double = 2.718281828459045f64;
+ let col_boolean = true;
+ let col_char = "hello";
+ let col_string = "world of fluss rust client";
+ let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
// 123.45
+ let col_date = Date::new(20476); // 2026-01-23
+ let col_time_s = Time::new(36827000); // 10:13:47
+ let col_time_ms = Time::new(36827123); // 10:13:47.123
+ let col_time_us = Time::new(86399999); // 23:59:59.999
+ let col_time_ns = Time::new(1); // 00:00:00.001
+ // 2026-01-23 10:13:47 UTC
+ let col_timestamp_s = TimestampNtz::new(1769163227000);
+ // 2026-01-23 10:13:47.123 UTC
+ let col_timestamp_ms = TimestampNtz::new(1769163227123);
+ // 2026-01-23 10:13:47.123456 UTC
+ let col_timestamp_us = TimestampNtz::from_millis_nanos(1769163227123,
456000).unwrap();
+ // 2026-01-23 10:13:47.123999999 UTC
+ let col_timestamp_ns = TimestampNtz::from_millis_nanos(1769163227123,
999_999).unwrap();
+ let col_timestamp_ltz_s = TimestampLtz::new(1769163227000);
+ let col_timestamp_ltz_ms = TimestampLtz::new(1769163227123);
+ let col_timestamp_ltz_us =
TimestampLtz::from_millis_nanos(1769163227123, 456000).unwrap();
+ let col_timestamp_ltz_ns =
TimestampLtz::from_millis_nanos(1769163227123, 999_999).unwrap();
+ let col_bytes: Vec<u8> = b"binary data".to_vec();
+
+ // Append a row with all datatypes
+ let mut row = GenericRow::new();
+ row.set_field(0, col_tinyint);
+ row.set_field(1, col_smallint);
+ row.set_field(2, col_int);
+ row.set_field(3, col_bigint);
+ row.set_field(4, col_float);
+ row.set_field(5, col_double);
+ row.set_field(6, col_boolean);
+ row.set_field(7, col_char);
+ row.set_field(8, col_string);
+ row.set_field(9, col_decimal.clone());
+ row.set_field(10, col_date);
+ row.set_field(11, col_time_s);
+ row.set_field(12, col_time_ms);
+ row.set_field(13, col_time_us);
+ row.set_field(14, col_time_ns);
+ row.set_field(15, col_timestamp_s);
+ row.set_field(16, col_timestamp_ms);
+ row.set_field(17, col_timestamp_us.clone());
+ row.set_field(18, col_timestamp_ns.clone());
+ row.set_field(19, col_timestamp_ltz_s);
+ row.set_field(20, col_timestamp_ltz_ms);
+ row.set_field(21, col_timestamp_ltz_us.clone());
+ row.set_field(22, col_timestamp_ltz_ns.clone());
+ row.set_field(23, col_bytes.as_slice());
+
+ append_writer
+ .append(row)
+ .await
+ .expect("Failed to append row with all datatypes");
+
+ // Append a row with null values for all columns
+ let mut row_with_nulls = GenericRow::new();
+ for i in 0..24 {
Review Comment:
The hard-coded loop bound `24` is tightly coupled to the number of columns
defined in the schema above; if the schema is extended or changed, this test
can silently become incomplete or incorrect. It would be more robust to derive
the column count (e.g., from the schema or `row.get_field_count()`) and use
that in both the null-population loop and the subsequent null assertions.
```suggestion
let field_count = row.get_field_count();
let mut row_with_nulls = GenericRow::new();
for i in 0..field_count {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]