This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch unsubscribe-partition in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit ec6b68cd1ff906aa58e471a6c8c6d2965c17a41a Author: luoyuxia <[email protected]> AuthorDate: Fri Feb 6 07:08:43 2026 +0800 feat: support unsubscribe partition --- bindings/cpp/include/fluss.hpp | 1 + bindings/cpp/src/lib.rs | 21 +++++++++++++++++++++ bindings/cpp/src/table.cpp | 9 +++++++++ crates/fluss/src/client/table/scanner.rs | 29 +++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index d35ece2..a6dfd69 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -545,6 +545,7 @@ public: Result Subscribe(int32_t bucket_id, int64_t start_offset); Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets); Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t start_offset); + Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id); Result Poll(int64_t timeout_ms, ScanRecords& out); Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index ab02c8d..bb51b29 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -255,6 +255,7 @@ mod ffi { bucket_id: i32, start_offset: i64, ) -> FfiResult; + fn unsubscribe_partition(self: &LogScanner, partition_id: i64, bucket_id: i32) -> FfiResult; fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult; fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> FfiArrowRecordBatchesResult; fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize); @@ -825,6 +826,26 @@ impl LogScanner { self.do_subscribe(Some(partition_id), bucket_id, start_offset) } + fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32) -> ffi::FfiResult { + if let Some(ref inner) = self.inner { + match RUNTIME.block_on(async { + inner.unsubscribe_partition(partition_id, bucket_id).await + }) { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } else if let Some(ref inner_batch) = self.inner_batch { + match RUNTIME.block_on(async { + inner_batch.unsubscribe_partition(partition_id, bucket_id).await + }) { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } else { + err_result(1, "LogScanner not initialized".to_string()) + } + } + fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult { if let Some(ref inner) = self.inner { let timeout = Duration::from_millis(timeout_ms as u64); diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index b327dba..c7d9105 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -274,6 +274,15 @@ Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, i return utils::from_ffi_result(ffi_result); } +Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t bucket_id) { + if (!Available()) { + return utils::make_error(1, "LogScanner not available"); + } + + auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id); + return utils::from_ffi_result(ffi_result); +} + Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) { if (!Available()) { return utils::make_error(1, "LogScanner not available"); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index aa9fca4..bd52565 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -409,6 +409,19 @@ impl LogScannerInner { Ok(()) } + async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> { + if !self.is_partitioned_table { + return Err(Error::UnsupportedOperation { + message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(), + }); + } + let table_bucket = + TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket); + self.log_scanner_status + .unassign_scan_buckets(from_ref(&table_bucket)); + Ok(()) + } + async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { let result = self.log_fetcher.collect_fetches()?; if !result.is_empty() { @@ -487,6 +500,14 @@ impl LogScanner { .subscribe_partition(partition_id, bucket, offset) .await } + + pub async fn unsubscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + ) -> Result<()> { + self.inner.unsubscribe_partition(partition_id, bucket).await + } } // Implementation for RecordBatchLogScanner (batches mode) @@ -514,6 +535,14 @@ impl RecordBatchLogScanner { .subscribe_partition(partition_id, bucket, offset) .await } + + pub async fn unsubscribe_partition( + &self, + partition_id: PartitionId, + bucket: i32, + ) -> Result<()> { + self.inner.unsubscribe_partition(partition_id, bucket).await + } } struct LogFetcher {
