This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new db72701  chore: add integration test for list offsets (#52)
db72701 is described below

commit db727017c71acb147bb36eda366bb18c3c831b96
Author: Pavlos-Petros Tournaris <[email protected]>
AuthorDate: Thu Dec 25 16:47:42 2025 +0200

    chore: add integration test for list offsets (#52)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 crates/fluss/tests/integration/table.rs | 126 ++++++++++++++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index 3f7946e..006adcc 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -38,8 +38,11 @@ mod table_test {
     use arrow::array::record_batch;
     use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, 
TablePath};
     use fluss::row::InternalRow;
+    use fluss::rpc::message::OffsetSpec;
+    use jiff::Timestamp;
     use std::sync::Arc;
     use std::thread;
+
     fn before_all() {
         // Create a new tokio runtime in a separate thread
         let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
@@ -256,4 +259,127 @@ mod table_test {
             );
         }
     }
+
+    #[tokio::test]
+    async fn list_offsets() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_list_offsets".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        // Wait for table to be fully initialized
+        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
+
+        // Test earliest offset (should be 0 for empty table)
+        let earliest_offsets = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Earliest)
+            .await
+            .expect("Failed to list earliest offsets");
+
+        assert_eq!(
+            earliest_offsets.get(&0),
+            Some(&0),
+            "Earliest offset should be 0 for bucket 0"
+        );
+
+        // Test latest offset (should be 0 for empty table)
+        let latest_offsets = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Latest)
+            .await
+            .expect("Failed to list latest offsets");
+
+        assert_eq!(
+            latest_offsets.get(&0),
+            Some(&0),
+            "Latest offset should be 0 for empty table"
+        );
+
+        let before_append_ms = Timestamp::now().as_millisecond();
+
+        // Append some records
+        let append_writer = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table")
+            .new_append()
+            .expect("Failed to create append")
+            .create_writer();
+
+        let batch = record_batch!(
+            ("id", Int32, [1, 2, 3]),
+            ("name", Utf8, ["alice", "bob", "charlie"])
+        )
+        .unwrap();
+        append_writer
+            .append_arrow_batch(batch)
+            .await
+            .expect("Failed to append batch");
+
+        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+        let after_append_ms = Timestamp::now().as_millisecond();
+
+        // Test latest offset after appending (should be 3)
+        let latest_offsets_after = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Latest)
+            .await
+            .expect("Failed to list latest offsets after append");
+
+        assert_eq!(
+            latest_offsets_after.get(&0),
+            Some(&3),
+            "Latest offset should be 3 after appending 3 records"
+        );
+
+        // Test earliest offset after appending (should still be 0)
+        let earliest_offsets_after = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Earliest)
+            .await
+            .expect("Failed to list earliest offsets after append");
+
+        assert_eq!(
+            earliest_offsets_after.get(&0),
+            Some(&0),
+            "Earliest offset should still be 0"
+        );
+
+        // Test list_offsets_by_timestamp
+
+        let timestamp_offsets = admin
+            .list_offsets(&table_path, &[0], 
OffsetSpec::Timestamp(before_append_ms))
+            .await
+            .expect("Failed to list offsets by timestamp");
+
+        assert_eq!(
+            timestamp_offsets.get(&0),
+            Some(&0),
+            "Timestamp before append should resolve to offset 0 (start of new 
data)"
+        );
+
+        let timestamp_offsets = admin
+            .list_offsets(&table_path, &[0], 
OffsetSpec::Timestamp(after_append_ms))
+            .await
+            .expect("Failed to list offsets by timestamp");
+
+        assert_eq!(
+            timestamp_offsets.get(&0),
+            Some(&3),
+            "Timestamp after append should resolve to offset 0 (no newer 
records)"
+        );
+    }
 }

Reply via email to