This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new db72701 chore: add integration test for list offsets (#52)
db72701 is described below
commit db727017c71acb147bb36eda366bb18c3c831b96
Author: Pavlos-Petros Tournaris <[email protected]>
AuthorDate: Thu Dec 25 16:47:42 2025 +0200
chore: add integration test for list offsets (#52)
---------
Co-authored-by: luoyuxia <[email protected]>
---
crates/fluss/tests/integration/table.rs | 126 ++++++++++++++++++++++++++++++++
1 file changed, 126 insertions(+)
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index 3f7946e..006adcc 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -38,8 +38,11 @@ mod table_test {
use arrow::array::record_batch;
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor,
TablePath};
use fluss::row::InternalRow;
+ use fluss::rpc::message::OffsetSpec;
+ use jiff::Timestamp;
use std::sync::Arc;
use std::thread;
+
fn before_all() {
// Create a new tokio runtime in a separate thread
let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
@@ -256,4 +259,127 @@ mod table_test {
);
}
}
+
+ #[tokio::test]
+ async fn list_offsets() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_list_offsets".to_string());
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ // Wait for table to be fully initialized
+ tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
+
+ // Test earliest offset (should be 0 for empty table)
+ let earliest_offsets = admin
+ .list_offsets(&table_path, &[0], OffsetSpec::Earliest)
+ .await
+ .expect("Failed to list earliest offsets");
+
+ assert_eq!(
+ earliest_offsets.get(&0),
+ Some(&0),
+ "Earliest offset should be 0 for bucket 0"
+ );
+
+ // Test latest offset (should be 0 for empty table)
+ let latest_offsets = admin
+ .list_offsets(&table_path, &[0], OffsetSpec::Latest)
+ .await
+ .expect("Failed to list latest offsets");
+
+ assert_eq!(
+ latest_offsets.get(&0),
+ Some(&0),
+ "Latest offset should be 0 for empty table"
+ );
+
+ let before_append_ms = Timestamp::now().as_millisecond();
+
+ // Append some records
+ let append_writer = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table")
+ .new_append()
+ .expect("Failed to create append")
+ .create_writer();
+
+ let batch = record_batch!(
+ ("id", Int32, [1, 2, 3]),
+ ("name", Utf8, ["alice", "bob", "charlie"])
+ )
+ .unwrap();
+ append_writer
+ .append_arrow_batch(batch)
+ .await
+ .expect("Failed to append batch");
+
+ tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+ let after_append_ms = Timestamp::now().as_millisecond();
+
+ // Test latest offset after appending (should be 3)
+ let latest_offsets_after = admin
+ .list_offsets(&table_path, &[0], OffsetSpec::Latest)
+ .await
+ .expect("Failed to list latest offsets after append");
+
+ assert_eq!(
+ latest_offsets_after.get(&0),
+ Some(&3),
+ "Latest offset should be 3 after appending 3 records"
+ );
+
+ // Test earliest offset after appending (should still be 0)
+ let earliest_offsets_after = admin
+ .list_offsets(&table_path, &[0], OffsetSpec::Earliest)
+ .await
+ .expect("Failed to list earliest offsets after append");
+
+ assert_eq!(
+ earliest_offsets_after.get(&0),
+ Some(&0),
+ "Earliest offset should still be 0"
+ );
+
+ // Test list_offsets_by_timestamp
+
+ let timestamp_offsets = admin
+ .list_offsets(&table_path, &[0],
OffsetSpec::Timestamp(before_append_ms))
+ .await
+ .expect("Failed to list offsets by timestamp");
+
+ assert_eq!(
+ timestamp_offsets.get(&0),
+ Some(&0),
+ "Timestamp before append should resolve to offset 0 (start of new
data)"
+ );
+
+ let timestamp_offsets = admin
+ .list_offsets(&table_path, &[0],
OffsetSpec::Timestamp(after_append_ms))
+ .await
+ .expect("Failed to list offsets by timestamp");
+
+ assert_eq!(
+ timestamp_offsets.get(&0),
+ Some(&3),
+ "Timestamp after append should resolve to offset 0 (no newer
records)"
+ );
+ }
}