This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ff879b9 feat: Add unsubscribe_partition to python bindings (#277)
ff879b9 is described below
commit ff879b9a4c5fb6c52600a61a1bb43fdcb59cd6cf
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 00:35:38 2026 +0000
feat: Add unsubscribe_partition to python bindings (#277)
---
bindings/python/example/example.py | 13 +++++++++++++
bindings/python/fluss/__init__.pyi | 8 ++++++++
bindings/python/src/table.rs | 17 +++++++++++++++++
3 files changed, 38 insertions(+)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index d56879a..dd7f1b1 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -717,6 +717,19 @@ async def main():
print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records
from partitioned table:")
print(partitioned_arrow.to_pandas())
+ # Demo: unsubscribe_partition - unsubscribe from one partition, read
remaining
+ print("\n--- Testing unsubscribe_partition ---")
+ partitioned_scanner3 = await
partitioned_table.new_scan().create_batch_scanner()
+ for p in partition_infos:
+ partitioned_scanner3.subscribe_partition(p.partition_id, 0,
fluss.EARLIEST_OFFSET)
+ # Unsubscribe from the first partition
+ first_partition = partition_infos[0]
+
partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0)
+ print(f"Unsubscribed from partition {first_partition.partition_name}
(id={first_partition.partition_id})")
+ remaining_arrow = partitioned_scanner3.to_arrow()
+ print(f"After unsubscribe, to_arrow() returned
{remaining_arrow.num_rows} records (from remaining partitions):")
+ print(remaining_arrow.to_pandas())
+
# Demo: to_pandas() also works for partitioned tables
print("\n--- Testing to_pandas() on partitioned table ---")
partitioned_scanner2 = await
partitioned_table.new_scan().create_batch_scanner()
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index a2bbaac..526dad7 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -420,6 +420,14 @@ class LogScanner:
start_offset: The offset to start reading from (use
EARLIEST_OFFSET for beginning)
"""
...
+ def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None:
+ """Unsubscribe from a specific partition bucket (partitioned tables
only).
+
+ Args:
+ partition_id: The partition ID to unsubscribe from
+ bucket_id: The bucket ID within the partition
+ """
+ ...
def poll(self, timeout_ms: int) -> List[ScanRecord]:
"""Poll for individual records with metadata.
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8af6b13..1a7dbdc 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -1657,6 +1657,23 @@ impl LogScanner {
})
}
+ /// Unsubscribe from a specific partition bucket (partitioned tables only).
+ ///
+ /// Args:
+ /// partition_id: The partition ID to unsubscribe from
+ /// bucket_id: The bucket ID within the partition
+ fn unsubscribe_partition(&self, py: Python, partition_id: i64, bucket_id:
i32) -> PyResult<()> {
+ py.detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ with_scanner!(
+ &self.scanner,
+ unsubscribe_partition(partition_id, bucket_id)
+ )
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ })
+ }
+
/// Poll for individual records with metadata.
///
/// Args: