pavlospt commented on code in PR #52:
URL: https://github.com/apache/fluss-rust/pull/52#discussion_r2644032352


##########
crates/fluss/tests/integration/table.rs:
##########
@@ -252,4 +253,111 @@ mod table_test {
             );
         }
     }
+
+    #[tokio::test]
+    async fn list_offsets() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_list_offsets".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        // Wait for table to be fully initialized
+        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
+
+        // Test earliest offset (should be 0 for empty table)
+        let earliest_offsets = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Earliest)
+            .await
+            .expect("Failed to list earliest offsets");
+
+        assert_eq!(
+            earliest_offsets.get(&0),
+            Some(&0),
+            "Earliest offset should be 0 for bucket 0"
+        );
+
+        // Test latest offset (should be 0 for empty table)
+        let latest_offsets = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Latest)
+            .await
+            .expect("Failed to list latest offsets");
+
+        assert_eq!(
+            latest_offsets.get(&0),
+            Some(&0),
+            "Latest offset should be 0 for empty table"
+        );
+
+        // Append some records
+        let append_writer = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table")
+            .new_append()
+            .expect("Failed to create append")
+            .create_writer();
+
+        let batch = record_batch!(
+            ("id", Int32, [1, 2, 3]),
+            ("name", Utf8, ["alice", "bob", "charlie"])
+        )
+        .unwrap();
+        append_writer
+            .append_arrow_batch(batch)
+            .await
+            .expect("Failed to append batch");
+
+        // Wait for records to be committed
+        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
+
+        // Test latest offset after appending (should be 3)
+        let latest_offsets_after = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Latest)
+            .await
+            .expect("Failed to list latest offsets after append");
+
+        assert_eq!(
+            latest_offsets_after.get(&0),
+            Some(&3),
+            "Latest offset should be 3 after appending 3 records"
+        );
+
+        // Test earliest offset after appending (should still be 0)
+        let earliest_offsets_after = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Earliest)
+            .await
+            .expect("Failed to list earliest offsets after append");
+
+        assert_eq!(
+            earliest_offsets_after.get(&0),
+            Some(&0),
+            "Earliest offset should still be 0"
+        );
+
+        // Test with multiple buckets

Review Comment:
   Updated! 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to