This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9af12ea  feat: support unsubscribe partition (#257)
9af12ea is described below

commit 9af12ea197f6d90baa3b71320a6cef7cec60bb11
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Feb 6 09:36:07 2026 +0800

    feat: support unsubscribe partition (#257)
---
 bindings/cpp/include/fluss.hpp              |  1 +
 bindings/cpp/src/lib.rs                     | 24 ++++++++++++
 bindings/cpp/src/table.cpp                  |  9 +++++
 crates/fluss/src/client/table/scanner.rs    | 29 +++++++++++++++
 crates/fluss/tests/integration/log_table.rs | 58 +++++++++++++++++++++++++++++
 5 files changed, 121 insertions(+)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 901b90c..3ff9a26 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -564,6 +564,7 @@ public:
     Result Subscribe(int32_t bucket_id, int64_t start_offset);
     Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
     Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t 
start_offset);
+    Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
     Result Poll(int64_t timeout_ms, ScanRecords& out);
     Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
 
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index b834865..d6e3a9a 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -255,6 +255,8 @@ mod ffi {
             bucket_id: i32,
             start_offset: i64,
         ) -> FfiResult;
+        fn unsubscribe_partition(self: &LogScanner, partition_id: i64, 
bucket_id: i32)
+        -> FfiResult;
         fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
         fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> 
FfiArrowRecordBatchesResult;
         fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
@@ -825,6 +827,28 @@ impl LogScanner {
         self.do_subscribe(Some(partition_id), bucket_id, start_offset)
     }
 
+    fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) 
-> ffi::FfiResult {
+        if let Some(ref inner) = self.inner {
+            match RUNTIME
+                .block_on(async { inner.unsubscribe_partition(partition_id, 
bucket_id).await })
+            {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else if let Some(ref inner_batch) = self.inner_batch {
+            match RUNTIME.block_on(async {
+                inner_batch
+                    .unsubscribe_partition(partition_id, bucket_id)
+                    .await
+            }) {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else {
+            err_result(1, "LogScanner not initialized".to_string())
+        }
+    }
+
     fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
         if let Some(ref inner) = self.inner {
             let timeout = Duration::from_millis(timeout_ms as u64);
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index ab26038..efb762b 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -264,6 +264,15 @@ Result LogScanner::SubscribePartition(int64_t 
partition_id, int32_t bucket_id, i
     return utils::from_ffi_result(ffi_result);
 }
 
+Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t 
bucket_id) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id);
+    return utils::from_ffi_result(ffi_result);
+}
+
 Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
     if (!Available()) {
         return utils::make_error(1, "LogScanner not available");
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index d50f19e..26f54da 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -409,6 +409,19 @@ impl LogScannerInner {
         Ok(())
     }
 
+    async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: 
i32) -> Result<()> {
+        if !self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message: "Can't unsubscribe a partition for a non-partitioned 
table.".to_string(),
+            });
+        }
+        let table_bucket =
+            TableBucket::new_with_partition(self.table_id, Some(partition_id), 
bucket);
+        self.log_scanner_status
+            .unassign_scan_buckets(from_ref(&table_bucket));
+        Ok(())
+    }
+
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
         let result = self.log_fetcher.collect_fetches()?;
         if !result.is_empty() {
@@ -487,6 +500,14 @@ impl LogScanner {
             .subscribe_partition(partition_id, bucket, offset)
             .await
     }
+
+    pub async fn unsubscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+    ) -> Result<()> {
+        self.inner.unsubscribe_partition(partition_id, bucket).await
+    }
 }
 
 // Implementation for RecordBatchLogScanner (batches mode)
@@ -524,6 +545,14 @@ impl RecordBatchLogScanner {
     pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
         self.inner.log_scanner_status.get_all_subscriptions()
     }
+
+    pub async fn unsubscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+    ) -> Result<()> {
+        self.inner.unsubscribe_partition(partition_id, bucket).await
+    }
 }
 
 struct LogFetcher {
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 3f7dd6e..493bb34 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -1160,6 +1160,64 @@ mod table_test {
             "Data mismatch between sent and received"
         );
 
+        // Test unsubscribe_partition: after unsubscribing from one partition,
+        // data from that partition should no longer be read.
+        let log_scanner_unsub = table
+            .new_scan()
+            .create_log_scanner()
+            .expect("Failed to create log scanner for unsubscribe test");
+        let partition_infos = admin
+            .list_partition_infos(&table_path)
+            .await
+            .expect("Failed to list partition infos");
+        let eu_partition_id = partition_infos
+            .iter()
+            .find(|p| p.get_partition_name() == "EU")
+            .map(|p| p.get_partition_id())
+            .expect("EU partition should exist");
+        for info in &partition_infos {
+            log_scanner_unsub
+                .subscribe_partition(info.get_partition_id(), 0, 0)
+                .await
+                .expect("Failed to subscribe to partition");
+        }
+        log_scanner_unsub
+            .unsubscribe_partition(eu_partition_id, 0)
+            .await
+            .expect("Failed to unsubscribe from EU partition");
+
+        let mut records_after_unsubscribe: Vec<(i32, String, i64)> = 
Vec::new();
+        let unsub_deadline = std::time::Instant::now() + 
Duration::from_secs(5);
+        while records_after_unsubscribe.len() < 4 && std::time::Instant::now() 
< unsub_deadline {
+            let records = log_scanner_unsub
+                .poll(Duration::from_millis(300))
+                .await
+                .expect("Failed to poll after unsubscribe");
+            for rec in records {
+                let row = rec.row();
+                records_after_unsubscribe.push((
+                    row.get_int(0),
+                    row.get_string(1).to_string(),
+                    row.get_long(2),
+                ));
+            }
+        }
+
+        assert!(
+            records_after_unsubscribe.iter().all(|r| r.1 == "US"),
+            "After unsubscribe_partition(EU), only US partition data should be 
read; got regions: {:?}",
+            records_after_unsubscribe
+                .iter()
+                .map(|r| r.1.as_str())
+                .collect::<Vec<_>>()
+        );
+        assert_eq!(
+            records_after_unsubscribe.len(),
+            4,
+            "Should receive exactly 4 US records (ids 1,2,5,6); got {}",
+            records_after_unsubscribe.len()
+        );
+
         admin
             .drop_table(&table_path, false)
             .await

Reply via email to