This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch unsubscribe-partition
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git

commit ec6b68cd1ff906aa58e471a6c8c6d2965c17a41a
Author: luoyuxia <[email protected]>
AuthorDate: Fri Feb 6 07:08:43 2026 +0800

    feat: support unsubscribe partition
---
 bindings/cpp/include/fluss.hpp           |  1 +
 bindings/cpp/src/lib.rs                  | 21 +++++++++++++++++++++
 bindings/cpp/src/table.cpp               |  9 +++++++++
 crates/fluss/src/client/table/scanner.rs | 29 +++++++++++++++++++++++++++++
 4 files changed, 60 insertions(+)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index d35ece2..a6dfd69 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -545,6 +545,7 @@ public:
     Result Subscribe(int32_t bucket_id, int64_t start_offset);
     Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
     Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t 
start_offset);
+    Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
     Result Poll(int64_t timeout_ms, ScanRecords& out);
     Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
 
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index ab02c8d..bb51b29 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -255,6 +255,7 @@ mod ffi {
             bucket_id: i32,
             start_offset: i64,
         ) -> FfiResult;
+        fn unsubscribe_partition(self: &LogScanner, partition_id: i64, 
bucket_id: i32) -> FfiResult;
         fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
         fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> 
FfiArrowRecordBatchesResult;
         fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
@@ -825,6 +826,26 @@ impl LogScanner {
         self.do_subscribe(Some(partition_id), bucket_id, start_offset)
     }
 
+    fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) 
-> ffi::FfiResult {
+        if let Some(ref inner) = self.inner {
+            match RUNTIME.block_on(async {
+                inner.unsubscribe_partition(partition_id, bucket_id).await
+            }) {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else if let Some(ref inner_batch) = self.inner_batch {
+            match RUNTIME.block_on(async {
+                inner_batch.unsubscribe_partition(partition_id, 
bucket_id).await
+            }) {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else {
+            err_result(1, "LogScanner not initialized".to_string())
+        }
+    }
+
     fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
         if let Some(ref inner) = self.inner {
             let timeout = Duration::from_millis(timeout_ms as u64);
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b327dba..c7d9105 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -274,6 +274,15 @@ Result LogScanner::SubscribePartition(int64_t 
partition_id, int32_t bucket_id, i
     return utils::from_ffi_result(ffi_result);
 }
 
+Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t 
bucket_id) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id);
+    return utils::from_ffi_result(ffi_result);
+}
+
 Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
     if (!Available()) {
         return utils::make_error(1, "LogScanner not available");
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index aa9fca4..bd52565 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -409,6 +409,19 @@ impl LogScannerInner {
         Ok(())
     }
 
+    async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: 
i32) -> Result<()> {
+        if !self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message: "Can't unsubscribe a partition for a non-partitioned 
table.".to_string(),
+            });
+        }
+        let table_bucket =
+            TableBucket::new_with_partition(self.table_id, Some(partition_id), 
bucket);
+        self.log_scanner_status
+            .unassign_scan_buckets(from_ref(&table_bucket));
+        Ok(())
+    }
+
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
         let result = self.log_fetcher.collect_fetches()?;
         if !result.is_empty() {
@@ -487,6 +500,14 @@ impl LogScanner {
             .subscribe_partition(partition_id, bucket, offset)
             .await
     }
+
+    pub async fn unsubscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+    ) -> Result<()> {
+        self.inner.unsubscribe_partition(partition_id, bucket).await
+    }
 }
 
 // Implementation for RecordBatchLogScanner (batches mode)
@@ -514,6 +535,14 @@ impl RecordBatchLogScanner {
             .subscribe_partition(partition_id, bucket, offset)
             .await
     }
+
+    pub async fn unsubscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+    ) -> Result<()> {
+        self.inner.unsubscribe_partition(partition_id, bucket).await
+    }
 }
 
 struct LogFetcher {

Reply via email to