This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6c6687b chore: add integration test for scan records after append
(#51)
6c6687b is described below
commit 6c6687b548996f5d56c36d9aa63f677933e099e7
Author: Pavlos-Petros Tournaris <[email protected]>
AuthorDate: Mon Nov 24 04:48:37 2025 +0200
chore: add integration test for scan records after append (#51)
---
crates/fluss/tests/integration/table.rs | 50 +++++++++++++++++++++++++++++++--
1 file changed, 48 insertions(+), 2 deletions(-)
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index a1a6cb2..aa02724 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -36,7 +36,8 @@ mod table_test {
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
use crate::integration::utils::create_table;
use arrow::array::record_batch;
- use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+ use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor,
TablePath};
+ use fluss::row::InternalRow;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::thread;
@@ -127,6 +128,51 @@ mod table_test {
.await
.expect("Failed to append batch");
- // todo: add scan code to verify the records appended in #30
+ // Create scanner to verify appended records
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let table_scan = table.new_scan();
+ let log_scanner = table_scan.create_log_scanner();
+
+ // Subscribe to bucket 0 starting from offset 0
+ log_scanner
+ .subscribe(0, 0)
+ .await
+ .expect("Failed to subscribe to bucket");
+
+ // Poll for records
+ let scan_records = log_scanner
+ .poll(tokio::time::Duration::from_secs(5))
+ .await
+ .expect("Failed to poll records");
+
+ // Verify the scanned records
+ let table_bucket = TableBucket::new(table.table_info().table_id, 0);
+ let records = scan_records.records(&table_bucket);
+
+ assert_eq!(records.len(), 6, "Expected 6 records");
+
+ // Verify record contents match what was appended
+ let expected_c1_values = vec![1, 2, 3, 4, 5, 6];
+ let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"];
+
+ for (i, record) in records.iter().enumerate() {
+ let row = record.row();
+ assert_eq!(
+ row.get_int(0),
+ expected_c1_values[i],
+ "c1 value mismatch at row {}",
+ i
+ );
+ assert_eq!(
+ row.get_string(1),
+ expected_c2_values[i],
+ "c2 value mismatch at row {}",
+ i
+ );
+ }
}
}