This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9af12ea feat: support unsubscribe partition (#257)
9af12ea is described below
commit 9af12ea197f6d90baa3b71320a6cef7cec60bb11
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Feb 6 09:36:07 2026 +0800
feat: support unsubscribe partition (#257)
---
bindings/cpp/include/fluss.hpp | 1 +
bindings/cpp/src/lib.rs | 24 ++++++++++++
bindings/cpp/src/table.cpp | 9 +++++
crates/fluss/src/client/table/scanner.rs | 29 +++++++++++++++
crates/fluss/tests/integration/log_table.rs | 58 +++++++++++++++++++++++++++++
5 files changed, 121 insertions(+)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 901b90c..3ff9a26 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -564,6 +564,7 @@ public:
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t
start_offset);
+ Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
Result Poll(int64_t timeout_ms, ScanRecords& out);
Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index b834865..d6e3a9a 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -255,6 +255,8 @@ mod ffi {
bucket_id: i32,
start_offset: i64,
) -> FfiResult;
+ fn unsubscribe_partition(self: &LogScanner, partition_id: i64,
bucket_id: i32)
+ -> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
fn poll_record_batch(self: &LogScanner, timeout_ms: i64) ->
FfiArrowRecordBatchesResult;
fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
@@ -825,6 +827,28 @@ impl LogScanner {
self.do_subscribe(Some(partition_id), bucket_id, start_offset)
}
+ fn unsubscribe_partition(&self, partition_id: PartitionId, bucket_id: i32)
-> ffi::FfiResult {
+ if let Some(ref inner) = self.inner {
+ match RUNTIME
+ .block_on(async { inner.unsubscribe_partition(partition_id,
bucket_id).await })
+ {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else if let Some(ref inner_batch) = self.inner_batch {
+ match RUNTIME.block_on(async {
+ inner_batch
+ .unsubscribe_partition(partition_id, bucket_id)
+ .await
+ }) {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else {
+ err_result(1, "LogScanner not initialized".to_string())
+ }
+ }
+
fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
if let Some(ref inner) = self.inner {
let timeout = Duration::from_millis(timeout_ms as u64);
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index ab26038..efb762b 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -264,6 +264,15 @@ Result LogScanner::SubscribePartition(int64_t
partition_id, int32_t bucket_id, i
return utils::from_ffi_result(ffi_result);
}
+Result LogScanner::UnsubscribePartition(int64_t partition_id, int32_t
bucket_id) {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ auto ffi_result = scanner_->unsubscribe_partition(partition_id, bucket_id);
+ return utils::from_ffi_result(ffi_result);
+}
+
Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index d50f19e..26f54da 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -409,6 +409,19 @@ impl LogScannerInner {
Ok(())
}
+ async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket:
i32) -> Result<()> {
+ if !self.is_partitioned_table {
+ return Err(Error::UnsupportedOperation {
+ message: "Can't unsubscribe a partition for a non-partitioned
table.".to_string(),
+ });
+ }
+ let table_bucket =
+ TableBucket::new_with_partition(self.table_id, Some(partition_id),
bucket);
+ self.log_scanner_status
+ .unassign_scan_buckets(from_ref(&table_bucket));
+ Ok(())
+ }
+
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
let result = self.log_fetcher.collect_fetches()?;
if !result.is_empty() {
@@ -487,6 +500,14 @@ impl LogScanner {
.subscribe_partition(partition_id, bucket, offset)
.await
}
+
+ pub async fn unsubscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ ) -> Result<()> {
+ self.inner.unsubscribe_partition(partition_id, bucket).await
+ }
}
// Implementation for RecordBatchLogScanner (batches mode)
@@ -524,6 +545,14 @@ impl RecordBatchLogScanner {
pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
self.inner.log_scanner_status.get_all_subscriptions()
}
+
+ pub async fn unsubscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ ) -> Result<()> {
+ self.inner.unsubscribe_partition(partition_id, bucket).await
+ }
}
struct LogFetcher {
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 3f7dd6e..493bb34 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -1160,6 +1160,64 @@ mod table_test {
"Data mismatch between sent and received"
);
+ // Test unsubscribe_partition: after unsubscribing from one partition,
+ // data from that partition should no longer be read.
+ let log_scanner_unsub = table
+ .new_scan()
+ .create_log_scanner()
+ .expect("Failed to create log scanner for unsubscribe test");
+ let partition_infos = admin
+ .list_partition_infos(&table_path)
+ .await
+ .expect("Failed to list partition infos");
+ let eu_partition_id = partition_infos
+ .iter()
+ .find(|p| p.get_partition_name() == "EU")
+ .map(|p| p.get_partition_id())
+ .expect("EU partition should exist");
+ for info in &partition_infos {
+ log_scanner_unsub
+ .subscribe_partition(info.get_partition_id(), 0, 0)
+ .await
+ .expect("Failed to subscribe to partition");
+ }
+ log_scanner_unsub
+ .unsubscribe_partition(eu_partition_id, 0)
+ .await
+ .expect("Failed to unsubscribe from EU partition");
+
+ let mut records_after_unsubscribe: Vec<(i32, String, i64)> =
Vec::new();
+ let unsub_deadline = std::time::Instant::now() +
Duration::from_secs(5);
+ while records_after_unsubscribe.len() < 4 && std::time::Instant::now()
< unsub_deadline {
+ let records = log_scanner_unsub
+ .poll(Duration::from_millis(300))
+ .await
+ .expect("Failed to poll after unsubscribe");
+ for rec in records {
+ let row = rec.row();
+ records_after_unsubscribe.push((
+ row.get_int(0),
+ row.get_string(1).to_string(),
+ row.get_long(2),
+ ));
+ }
+ }
+
+ assert!(
+ records_after_unsubscribe.iter().all(|r| r.1 == "US"),
+ "After unsubscribe_partition(EU), only US partition data should be
read; got regions: {:?}",
+ records_after_unsubscribe
+ .iter()
+ .map(|r| r.1.as_str())
+ .collect::<Vec<_>>()
+ );
+ assert_eq!(
+ records_after_unsubscribe.len(),
+ 4,
+ "Should receive exactly 4 US records (ids 1,2,5,6); got {}",
+ records_after_unsubscribe.len()
+ );
+
admin
.drop_table(&table_path, false)
.await