This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 670bb5e  chore: add integration tests for subscribe_batch and 
project_by_name (#116)
670bb5e is described below

commit 670bb5ee40ff21ed34832d460c5fc92d9a5f2cd9
Author: Andrea Bozzo <[email protected]>
AuthorDate: Fri Jan 2 09:54:09 2026 +0100

    chore: add integration tests for subscribe_batch and project_by_name (#116)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 bindings/cpp/src/lib.rs                  |   2 +-
 crates/fluss/src/client/table/scanner.rs |   6 +-
 crates/fluss/tests/integration/table.rs  | 243 +++++++++++++++++++++----------
 3 files changed, 169 insertions(+), 82 deletions(-)

diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index cd1803b..2d37763 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -625,7 +625,7 @@ impl LogScanner {
             bucket_offsets.insert(sub.bucket_id, sub.offset);
         }
 
-        let result = RUNTIME.block_on(async { 
self.inner.subscribe_batch(bucket_offsets).await });
+        let result = RUNTIME.block_on(async { 
self.inner.subscribe_batch(&bucket_offsets).await });
 
         match result {
             Ok(_) => ok_result(),
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index bf39839..0acaac8 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -223,7 +223,7 @@ impl LogScanner {
         Ok(())
     }
 
-    pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) -> 
Result<()> {
+    pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
         self.metadata
             .check_and_update_table_metadata(from_ref(&self.table_path))
             .await?;
@@ -236,8 +236,8 @@ impl LogScanner {
 
         let mut scan_bucket_offsets = HashMap::new();
         for (bucket_id, offset) in bucket_offsets {
-            let table_bucket = TableBucket::new(self.table_id, bucket_id);
-            scan_bucket_offsets.insert(table_bucket, offset);
+            let table_bucket = TableBucket::new(self.table_id, *bucket_id);
+            scan_bucket_offsets.insert(table_bucket, *offset);
         }
 
         self.log_scanner_status
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index 006adcc..0ac34c7 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -36,12 +36,16 @@ mod table_test {
     use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
     use crate::integration::utils::create_table;
     use arrow::array::record_batch;
+    use fluss::client::{FlussTable, TableScan};
     use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, 
TablePath};
+    use fluss::record::ScanRecord;
     use fluss::row::InternalRow;
     use fluss::rpc::message::OffsetSpec;
     use jiff::Timestamp;
+    use std::collections::HashMap;
     use std::sync::Arc;
     use std::thread;
+    use std::time::Duration;
 
     fn before_all() {
         // Create a new tokio runtime in a separate thread
@@ -137,6 +141,11 @@ mod table_test {
 
         append_writer.flush().await.expect("Failed to flush");
 
+        // Create scanner to verify appended records
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
         let num_buckets = table.table_info().get_num_buckets();
         let log_scanner = table
             .new_scan()
@@ -149,84 +158,6 @@ mod table_test {
                 .expect("Failed to subscribe");
         }
 
-        let scan_records = log_scanner
-            .poll(std::time::Duration::from_secs(60))
-            .await
-            .expect("Failed to poll");
-
-        let mut records: Vec<_> = scan_records.into_iter().collect();
-        records.sort_by_key(|r| r.offset());
-
-        assert_eq!(records.len(), 6, "Should have 6 records");
-        for (i, record) in records.iter().enumerate() {
-            let row = record.row();
-            let expected_c1 = (i + 1) as i32;
-            let expected_c2 = format!("a{}", i + 1);
-            assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", 
i);
-            assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index 
{}", i);
-        }
-
-        let log_scanner_projected = table
-            .new_scan()
-            .project(&[1, 0])
-            .expect("Failed to project")
-            .create_log_scanner()
-            .expect("Failed to create log scanner");
-        for bucket_id in 0..num_buckets {
-            log_scanner_projected
-                .subscribe(bucket_id, 0)
-                .await
-                .expect("Failed to subscribe");
-        }
-
-        let scan_records_projected = log_scanner_projected
-            .poll(std::time::Duration::from_secs(10))
-            .await
-            .expect("Failed to poll");
-
-        let mut records_projected: Vec<_> = 
scan_records_projected.into_iter().collect();
-        records_projected.sort_by_key(|r| r.offset());
-
-        assert_eq!(
-            records_projected.len(),
-            6,
-            "Should have 6 records with projection"
-        );
-        for (i, record) in records_projected.iter().enumerate() {
-            let row = record.row();
-            let expected_c2 = format!("a{}", i + 1);
-            let expected_c1 = (i + 1) as i32;
-            assert_eq!(
-                row.get_string(0),
-                expected_c2,
-                "Projected c2 (first column) mismatch at index {}",
-                i
-            );
-            assert_eq!(
-                row.get_int(1),
-                expected_c1,
-                "Projected c1 (second column) mismatch at index {}",
-                i
-            );
-        }
-
-        // Create scanner to verify appended records
-        let table = connection
-            .get_table(&table_path)
-            .await
-            .expect("Failed to get table");
-
-        let table_scan = table.new_scan();
-        let log_scanner = table_scan
-            .create_log_scanner()
-            .expect("Failed to create log scanner");
-
-        // Subscribe to bucket 0 starting from offset 0
-        log_scanner
-            .subscribe(0, 0)
-            .await
-            .expect("Failed to subscribe to bucket");
-
         // Poll for records
         let scan_records = log_scanner
             .poll(tokio::time::Duration::from_secs(10))
@@ -382,4 +313,160 @@ mod table_test {
             "Timestamp after append should resolve to offset 0 (no newer 
records)"
         );
     }
+
+    #[tokio::test]
+    async fn test_project() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_project".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("col_a", DataTypes::int())
+                    .column("col_b", DataTypes::string())
+                    .column("col_c", DataTypes::int())
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        // Append 3 records
+        let append_writer = table
+            .new_append()
+            .expect("Failed to create append")
+            .create_writer();
+
+        let batch = record_batch!(
+            ("col_a", Int32, [1, 2, 3]),
+            ("col_b", Utf8, ["x", "y", "z"]),
+            ("col_c", Int32, [10, 20, 30])
+        )
+        .unwrap();
+        append_writer
+            .append_arrow_batch(batch)
+            .await
+            .expect("Failed to append batch");
+        append_writer.flush().await.expect("Failed to flush");
+
+        // Test project_by_name: select col_b and col_c only
+        let records = scan_table(&table, |scan| {
+            scan.project_by_name(&["col_b", "col_c"])
+                .expect("Failed to project by name")
+        })
+        .await;
+
+        assert_eq!(
+            records.len(),
+            3,
+            "Should have 3 records with project_by_name"
+        );
+
+        // Verify projected columns are in the correct order (col_b, col_c)
+        let expected_col_b = ["x", "y", "z"];
+        let expected_col_c = [10, 20, 30];
+
+        for (i, record) in records.iter().enumerate() {
+            let row = record.row();
+            // col_b is now at index 0, col_c is at index 1
+            assert_eq!(
+                row.get_string(0),
+                expected_col_b[i],
+                "col_b mismatch at index {}",
+                i
+            );
+            assert_eq!(
+                row.get_int(1),
+                expected_col_c[i],
+                "col_c mismatch at index {}",
+                i
+            );
+        }
+
+        // test project by column indices
+        let records = scan_table(&table, |scan| {
+            scan.project(&[1, 0]).expect("Failed to project by indices")
+        })
+        .await;
+
+        assert_eq!(
+            records.len(),
+            3,
+            "Should have 3 records with project_by_name"
+        );
+        // Verify projected columns are in the correct order (col_b, col_a)
+        let expected_col_b = ["x", "y", "z"];
+        let expected_col_a = [1, 2, 3];
+
+        for (i, record) in records.iter().enumerate() {
+            let row = record.row();
+            // col_b is now at index 0, col_c is at index 1
+            assert_eq!(
+                row.get_string(0),
+                expected_col_b[i],
+                "col_b mismatch at index {}",
+                i
+            );
+            assert_eq!(
+                row.get_int(1),
+                expected_col_a[i],
+                "col_c mismatch at index {}",
+                i
+            );
+        }
+
+        // Test error case: empty column names should fail
+        let result = table.new_scan().project_by_name(&[]);
+        assert!(
+            result.is_err(),
+            "project_by_name with empty names should fail"
+        );
+
+        // Test error case: non-existent column should fail
+        let result = table.new_scan().project_by_name(&["nonexistent_column"]);
+        assert!(
+            result.is_err(),
+            "project_by_name with non-existent column should fail"
+        );
+    }
+
+    async fn scan_table<'a>(
+        table: &FlussTable<'a>,
+        setup_scan: impl FnOnce(TableScan) -> TableScan,
+    ) -> Vec<ScanRecord> {
+        // 1. build log scanner
+        let log_scanner = setup_scan(table.new_scan())
+            .create_log_scanner()
+            .expect("Failed to create log scanner");
+
+        // 2. subscribe
+        let mut bucket_offsets = HashMap::new();
+        bucket_offsets.insert(0, 0);
+        log_scanner
+            .subscribe_batch(&bucket_offsets)
+            .await
+            .expect("Failed to subscribe");
+
+        // 3. poll records
+        let scan_records = log_scanner
+            .poll(Duration::from_secs(10))
+            .await
+            .expect("Failed to poll");
+
+        // 4. collect and sort
+        let mut records: Vec<_> = scan_records.into_iter().collect();
+        records.sort_by_key(|r| r.offset());
+        records
+    }
 }

Reply via email to