This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 670bb5e chore: add integration tests for subscribe_batch and
project_by_name (#116)
670bb5e is described below
commit 670bb5ee40ff21ed34832d460c5fc92d9a5f2cd9
Author: Andrea Bozzo <[email protected]>
AuthorDate: Fri Jan 2 09:54:09 2026 +0100
chore: add integration tests for subscribe_batch and project_by_name (#116)
---------
Co-authored-by: luoyuxia <[email protected]>
---
bindings/cpp/src/lib.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 6 +-
crates/fluss/tests/integration/table.rs | 243 +++++++++++++++++++++----------
3 files changed, 169 insertions(+), 82 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index cd1803b..2d37763 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -625,7 +625,7 @@ impl LogScanner {
bucket_offsets.insert(sub.bucket_id, sub.offset);
}
- let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(bucket_offsets).await });
+ let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(&bucket_offsets).await });
match result {
Ok(_) => ok_result(),
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index bf39839..0acaac8 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -223,7 +223,7 @@ impl LogScanner {
Ok(())
}
- pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) ->
Result<()> {
+ pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
@@ -236,8 +236,8 @@ impl LogScanner {
let mut scan_bucket_offsets = HashMap::new();
for (bucket_id, offset) in bucket_offsets {
- let table_bucket = TableBucket::new(self.table_id, bucket_id);
- scan_bucket_offsets.insert(table_bucket, offset);
+ let table_bucket = TableBucket::new(self.table_id, *bucket_id);
+ scan_bucket_offsets.insert(table_bucket, *offset);
}
self.log_scanner_status
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index 006adcc..0ac34c7 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -36,12 +36,16 @@ mod table_test {
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
use crate::integration::utils::create_table;
use arrow::array::record_batch;
+ use fluss::client::{FlussTable, TableScan};
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor,
TablePath};
+ use fluss::record::ScanRecord;
use fluss::row::InternalRow;
use fluss::rpc::message::OffsetSpec;
use jiff::Timestamp;
+ use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
+ use std::time::Duration;
fn before_all() {
// Create a new tokio runtime in a separate thread
@@ -137,6 +141,11 @@ mod table_test {
append_writer.flush().await.expect("Failed to flush");
+ // Create scanner to verify appended records
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
let num_buckets = table.table_info().get_num_buckets();
let log_scanner = table
.new_scan()
@@ -149,84 +158,6 @@ mod table_test {
.expect("Failed to subscribe");
}
- let scan_records = log_scanner
- .poll(std::time::Duration::from_secs(60))
- .await
- .expect("Failed to poll");
-
- let mut records: Vec<_> = scan_records.into_iter().collect();
- records.sort_by_key(|r| r.offset());
-
- assert_eq!(records.len(), 6, "Should have 6 records");
- for (i, record) in records.iter().enumerate() {
- let row = record.row();
- let expected_c1 = (i + 1) as i32;
- let expected_c2 = format!("a{}", i + 1);
- assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}",
i);
- assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index
{}", i);
- }
-
- let log_scanner_projected = table
- .new_scan()
- .project(&[1, 0])
- .expect("Failed to project")
- .create_log_scanner()
- .expect("Failed to create log scanner");
- for bucket_id in 0..num_buckets {
- log_scanner_projected
- .subscribe(bucket_id, 0)
- .await
- .expect("Failed to subscribe");
- }
-
- let scan_records_projected = log_scanner_projected
- .poll(std::time::Duration::from_secs(10))
- .await
- .expect("Failed to poll");
-
- let mut records_projected: Vec<_> =
scan_records_projected.into_iter().collect();
- records_projected.sort_by_key(|r| r.offset());
-
- assert_eq!(
- records_projected.len(),
- 6,
- "Should have 6 records with projection"
- );
- for (i, record) in records_projected.iter().enumerate() {
- let row = record.row();
- let expected_c2 = format!("a{}", i + 1);
- let expected_c1 = (i + 1) as i32;
- assert_eq!(
- row.get_string(0),
- expected_c2,
- "Projected c2 (first column) mismatch at index {}",
- i
- );
- assert_eq!(
- row.get_int(1),
- expected_c1,
- "Projected c1 (second column) mismatch at index {}",
- i
- );
- }
-
- // Create scanner to verify appended records
- let table = connection
- .get_table(&table_path)
- .await
- .expect("Failed to get table");
-
- let table_scan = table.new_scan();
- let log_scanner = table_scan
- .create_log_scanner()
- .expect("Failed to create log scanner");
-
- // Subscribe to bucket 0 starting from offset 0
- log_scanner
- .subscribe(0, 0)
- .await
- .expect("Failed to subscribe to bucket");
-
// Poll for records
let scan_records = log_scanner
.poll(tokio::time::Duration::from_secs(10))
@@ -382,4 +313,160 @@ mod table_test {
"Timestamp after append should resolve to offset 0 (no newer
records)"
);
}
+
+ #[tokio::test]
+ async fn test_project() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_project".to_string());
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("col_a", DataTypes::int())
+ .column("col_b", DataTypes::string())
+ .column("col_c", DataTypes::int())
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ // Append 3 records
+ let append_writer = table
+ .new_append()
+ .expect("Failed to create append")
+ .create_writer();
+
+ let batch = record_batch!(
+ ("col_a", Int32, [1, 2, 3]),
+ ("col_b", Utf8, ["x", "y", "z"]),
+ ("col_c", Int32, [10, 20, 30])
+ )
+ .unwrap();
+ append_writer
+ .append_arrow_batch(batch)
+ .await
+ .expect("Failed to append batch");
+ append_writer.flush().await.expect("Failed to flush");
+
+ // Test project_by_name: select col_b and col_c only
+ let records = scan_table(&table, |scan| {
+ scan.project_by_name(&["col_b", "col_c"])
+ .expect("Failed to project by name")
+ })
+ .await;
+
+ assert_eq!(
+ records.len(),
+ 3,
+ "Should have 3 records with project_by_name"
+ );
+
+ // Verify projected columns are in the correct order (col_b, col_c)
+ let expected_col_b = ["x", "y", "z"];
+ let expected_col_c = [10, 20, 30];
+
+ for (i, record) in records.iter().enumerate() {
+ let row = record.row();
+ // col_b is now at index 0, col_c is at index 1
+ assert_eq!(
+ row.get_string(0),
+ expected_col_b[i],
+ "col_b mismatch at index {}",
+ i
+ );
+ assert_eq!(
+ row.get_int(1),
+ expected_col_c[i],
+ "col_c mismatch at index {}",
+ i
+ );
+ }
+
+ // test project by column indices
+ let records = scan_table(&table, |scan| {
+ scan.project(&[1, 0]).expect("Failed to project by indices")
+ })
+ .await;
+
+ assert_eq!(
+ records.len(),
+ 3,
+ "Should have 3 records with project_by_name"
+ );
+ // Verify projected columns are in the correct order (col_b, col_a)
+ let expected_col_b = ["x", "y", "z"];
+ let expected_col_a = [1, 2, 3];
+
+ for (i, record) in records.iter().enumerate() {
+ let row = record.row();
+ // col_b is now at index 0, col_c is at index 1
+ assert_eq!(
+ row.get_string(0),
+ expected_col_b[i],
+ "col_b mismatch at index {}",
+ i
+ );
+ assert_eq!(
+ row.get_int(1),
+ expected_col_a[i],
+ "col_c mismatch at index {}",
+ i
+ );
+ }
+
+ // Test error case: empty column names should fail
+ let result = table.new_scan().project_by_name(&[]);
+ assert!(
+ result.is_err(),
+ "project_by_name with empty names should fail"
+ );
+
+ // Test error case: non-existent column should fail
+ let result = table.new_scan().project_by_name(&["nonexistent_column"]);
+ assert!(
+ result.is_err(),
+ "project_by_name with non-existent column should fail"
+ );
+ }
+
+ async fn scan_table<'a>(
+ table: &FlussTable<'a>,
+ setup_scan: impl FnOnce(TableScan) -> TableScan,
+ ) -> Vec<ScanRecord> {
+ // 1. build log scanner
+ let log_scanner = setup_scan(table.new_scan())
+ .create_log_scanner()
+ .expect("Failed to create log scanner");
+
+ // 2. subscribe
+ let mut bucket_offsets = HashMap::new();
+ bucket_offsets.insert(0, 0);
+ log_scanner
+ .subscribe_batch(&bucket_offsets)
+ .await
+ .expect("Failed to subscribe");
+
+ // 3. poll records
+ let scan_records = log_scanner
+ .poll(Duration::from_secs(10))
+ .await
+ .expect("Failed to poll");
+
+ // 4. collect and sort
+ let mut records: Vec<_> = scan_records.into_iter().collect();
+ records.sort_by_key(|r| r.offset());
+ records
+ }
}