This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c6687b  chore: add integration test for scan records after append 
(#51)
6c6687b is described below

commit 6c6687b548996f5d56c36d9aa63f677933e099e7
Author: Pavlos-Petros Tournaris <[email protected]>
AuthorDate: Mon Nov 24 04:48:37 2025 +0200

    chore: add integration test for scan records after append (#51)
---
 crates/fluss/tests/integration/table.rs | 50 +++++++++++++++++++++++++++++++--
 1 file changed, 48 insertions(+), 2 deletions(-)

diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index a1a6cb2..aa02724 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -36,7 +36,8 @@ mod table_test {
     use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
     use crate::integration::utils::create_table;
     use arrow::array::record_batch;
-    use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+    use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, 
TablePath};
+    use fluss::row::InternalRow;
     use std::sync::Arc;
     use std::sync::atomic::AtomicUsize;
     use std::thread;
@@ -127,6 +128,51 @@ mod table_test {
             .await
             .expect("Failed to append batch");
 
-        // todo: add scan code to verify the records appended in #30
+        // Create scanner to verify appended records
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_scan = table.new_scan();
+        let log_scanner = table_scan.create_log_scanner();
+
+        // Subscribe to bucket 0 starting from offset 0
+        log_scanner
+            .subscribe(0, 0)
+            .await
+            .expect("Failed to subscribe to bucket");
+
+        // Poll for records
+        let scan_records = log_scanner
+            .poll(tokio::time::Duration::from_secs(5))
+            .await
+            .expect("Failed to poll records");
+
+        // Verify the scanned records
+        let table_bucket = TableBucket::new(table.table_info().table_id, 0);
+        let records = scan_records.records(&table_bucket);
+
+        assert_eq!(records.len(), 6, "Expected 6 records");
+
+        // Verify record contents match what was appended
+        let expected_c1_values = vec![1, 2, 3, 4, 5, 6];
+        let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"];
+
+        for (i, record) in records.iter().enumerate() {
+            let row = record.row();
+            assert_eq!(
+                row.get_int(0),
+                expected_c1_values[i],
+                "c1 value mismatch at row {}",
+                i
+            );
+            assert_eq!(
+                row.get_string(1),
+                expected_c2_values[i],
+                "c2 value mismatch at row {}",
+                i
+            );
+        }
     }
 }

Reply via email to