This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e09250c test: add it test for put & get kv for all supported
datatypes (#204)
e09250c is described below
commit e09250c918e7b910d36dbcb5841b8fc6bfc0fea4
Author: SkylerLin <[email protected]>
AuthorDate: Sat Jan 24 10:37:34 2026 +0800
test: add it test for put & get kv for all supported datatypes (#204)
---
crates/fluss/tests/integration/kv_table.rs | 248 +++++++++++++++++++++++++++++
1 file changed, 248 insertions(+)
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index efd7957..3f46f9f 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -441,4 +441,252 @@ mod kv_table_test {
.await
.expect("Failed to drop table");
}
+
+ /// Integration test covering put and get operations for all supported
datatypes.
+ #[tokio::test]
+ async fn all_supported_datatypes() {
+ use fluss::row::{Date, Datum, Decimal, Time, TimestampLtz,
TimestampNtz};
+
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_all_datatypes".to_string());
+
+ // Create a table with all supported primitive datatypes
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ // Primary key column
+ .column("pk_int", DataTypes::int())
+ // Boolean type
+ .column("col_boolean", DataTypes::boolean())
+ // Integer types
+ .column("col_tinyint", DataTypes::tinyint())
+ .column("col_smallint", DataTypes::smallint())
+ .column("col_int", DataTypes::int())
+ .column("col_bigint", DataTypes::bigint())
+ // Floating point types
+ .column("col_float", DataTypes::float())
+ .column("col_double", DataTypes::double())
+ // String types
+ .column("col_char", DataTypes::char(10))
+ .column("col_string", DataTypes::string())
+ // Decimal type
+ .column("col_decimal", DataTypes::decimal(10, 2))
+ // Date and time types
+ .column("col_date", DataTypes::date())
+ .column("col_time", DataTypes::time())
+ .column("col_timestamp", DataTypes::timestamp())
+ .column("col_timestamp_ltz", DataTypes::timestamp_ltz())
+ // Binary types
+ .column("col_bytes", DataTypes::bytes())
+ .column("col_binary", DataTypes::binary(20))
+ .primary_key(vec!["pk_int".to_string()])
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let table_upsert = table.new_upsert().expect("Failed to create
upsert");
+ let mut upsert_writer = table_upsert
+ .create_writer()
+ .expect("Failed to create writer");
+
+ // Test data for all datatypes
+ let pk_int = 1i32;
+ let col_boolean = true;
+ let col_tinyint = 127i8;
+ let col_smallint = 32767i16;
+ let col_int = 2147483647i32;
+ let col_bigint = 9223372036854775807i64;
+ let col_float = 3.14f32;
+ let col_double = 2.718281828459045f64;
+ let col_char = "hello";
+ let col_string = "world of fluss rust client";
+ let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
// 123.45
+ let col_date = Date::new(20476); // 2026-01-23
+ let col_time = Time::new(36827123); // 10:13:47.123
+ let col_timestamp = TimestampNtz::new(1769163227123); // 2026-01-23
10:13:47.123 UTC
+ let col_timestamp_ltz = TimestampLtz::new(1769163227123); //
2026-01-23 10:13:47.123 UTC
+ let col_bytes: &[u8] = b"binary data";
+ let col_binary: &[u8] = b"fixed binary data!!!";
+
+ // Upsert a row with all datatypes
+ let mut row = GenericRow::new();
+ row.set_field(0, pk_int);
+ row.set_field(1, col_boolean);
+ row.set_field(2, col_tinyint);
+ row.set_field(3, col_smallint);
+ row.set_field(4, col_int);
+ row.set_field(5, col_bigint);
+ row.set_field(6, col_float);
+ row.set_field(7, col_double);
+ row.set_field(8, col_char);
+ row.set_field(9, col_string);
+ row.set_field(10, col_decimal.clone());
+ row.set_field(11, col_date);
+ row.set_field(12, col_time);
+ row.set_field(13, col_timestamp);
+ row.set_field(14, col_timestamp_ltz);
+ row.set_field(15, col_bytes);
+ row.set_field(16, col_binary);
+
+ upsert_writer
+ .upsert(&row)
+ .await
+ .expect("Failed to upsert row with all datatypes");
+
+ // Lookup the record
+ let mut lookuper = table
+ .new_lookup()
+ .expect("Failed to create lookup")
+ .create_lookuper()
+ .expect("Failed to create lookuper");
+
+ let mut key = GenericRow::new();
+ key.set_field(0, pk_int);
+
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ let found_row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+
+ // Verify all datatypes
+ assert_eq!(found_row.get_int(0), pk_int, "pk_int mismatch");
+ assert_eq!(
+ found_row.get_boolean(1),
+ col_boolean,
+ "col_boolean mismatch"
+ );
+ assert_eq!(found_row.get_byte(2), col_tinyint, "col_tinyint mismatch");
+ assert_eq!(
+ found_row.get_short(3),
+ col_smallint,
+ "col_smallint mismatch"
+ );
+ assert_eq!(found_row.get_int(4), col_int, "col_int mismatch");
+ assert_eq!(found_row.get_long(5), col_bigint, "col_bigint mismatch");
+ assert!(
+ (found_row.get_float(6) - col_float).abs() < f32::EPSILON,
+ "col_float mismatch: expected {}, got {}",
+ col_float,
+ found_row.get_float(6)
+ );
+ assert!(
+ (found_row.get_double(7) - col_double).abs() < f64::EPSILON,
+ "col_double mismatch: expected {}, got {}",
+ col_double,
+ found_row.get_double(7)
+ );
+ assert_eq!(found_row.get_char(8, 10), col_char, "col_char mismatch");
+ assert_eq!(found_row.get_string(9), col_string, "col_string mismatch");
+ assert_eq!(
+ found_row.get_decimal(10, 10, 2),
+ col_decimal,
+ "col_decimal mismatch"
+ );
+ assert_eq!(
+ found_row.get_date(11).get_inner(),
+ col_date.get_inner(),
+ "col_date mismatch"
+ );
+ assert_eq!(
+ found_row.get_time(12).get_inner(),
+ col_time.get_inner(),
+ "col_time mismatch"
+ );
+ assert_eq!(
+ found_row.get_timestamp_ntz(13, 6).get_millisecond(),
+ col_timestamp.get_millisecond(),
+ "col_timestamp mismatch"
+ );
+ assert_eq!(
+ found_row.get_timestamp_ltz(14, 6).get_epoch_millisecond(),
+ col_timestamp_ltz.get_epoch_millisecond(),
+ "col_timestamp_ltz mismatch"
+ );
+ assert_eq!(found_row.get_bytes(15), col_bytes, "col_bytes mismatch");
+ assert_eq!(
+ found_row.get_binary(16, 20),
+ col_binary,
+ "col_binary mismatch"
+ );
+
+ // Test with null values for nullable columns
+ let pk_int_2 = 2i32;
+ let mut row_with_nulls = GenericRow::new();
+ row_with_nulls.set_field(0, pk_int_2);
+ row_with_nulls.set_field(1, Datum::Null); // col_boolean
+ row_with_nulls.set_field(2, Datum::Null); // col_tinyint
+ row_with_nulls.set_field(3, Datum::Null); // col_smallint
+ row_with_nulls.set_field(4, Datum::Null); // col_int
+ row_with_nulls.set_field(5, Datum::Null); // col_bigint
+ row_with_nulls.set_field(6, Datum::Null); // col_float
+ row_with_nulls.set_field(7, Datum::Null); // col_double
+ row_with_nulls.set_field(8, Datum::Null); // col_char
+ row_with_nulls.set_field(9, Datum::Null); // col_string
+ row_with_nulls.set_field(10, Datum::Null); // col_decimal
+ row_with_nulls.set_field(11, Datum::Null); // col_date
+ row_with_nulls.set_field(12, Datum::Null); // col_time
+ row_with_nulls.set_field(13, Datum::Null); // col_timestamp
+ row_with_nulls.set_field(14, Datum::Null); // col_timestamp_ltz
+ row_with_nulls.set_field(15, Datum::Null); // col_bytes
+ row_with_nulls.set_field(16, Datum::Null); // col_binary
+
+ upsert_writer
+ .upsert(&row_with_nulls)
+ .await
+ .expect("Failed to upsert row with nulls");
+
+ // Lookup row with nulls
+ let mut key2 = GenericRow::new();
+ key2.set_field(0, pk_int_2);
+
+ let result = lookuper.lookup(&key2).await.expect("Failed to lookup");
+ let found_row_nulls = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+
+ // Verify all nullable columns are null
+ assert_eq!(found_row_nulls.get_int(0), pk_int_2, "pk_int mismatch");
+ assert!(found_row_nulls.is_null_at(1), "col_boolean should be null");
+ assert!(found_row_nulls.is_null_at(2), "col_tinyint should be null");
+ assert!(found_row_nulls.is_null_at(3), "col_smallint should be null");
+ assert!(found_row_nulls.is_null_at(4), "col_int should be null");
+ assert!(found_row_nulls.is_null_at(5), "col_bigint should be null");
+ assert!(found_row_nulls.is_null_at(6), "col_float should be null");
+ assert!(found_row_nulls.is_null_at(7), "col_double should be null");
+ assert!(found_row_nulls.is_null_at(8), "col_char should be null");
+ assert!(found_row_nulls.is_null_at(9), "col_string should be null");
+ assert!(found_row_nulls.is_null_at(10), "col_decimal should be null");
+ assert!(found_row_nulls.is_null_at(11), "col_date should be null");
+ assert!(found_row_nulls.is_null_at(12), "col_time should be null");
+ assert!(
+ found_row_nulls.is_null_at(13),
+ "col_timestamp should be null"
+ );
+ assert!(
+ found_row_nulls.is_null_at(14),
+ "col_timestamp_ltz should be null"
+ );
+ assert!(found_row_nulls.is_null_at(15), "col_bytes should be null");
+ assert!(found_row_nulls.is_null_at(16), "col_binary should be null");
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
}