This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ff879b9  feat: Add unsubscribe_partition to python bindings (#277)
ff879b9 is described below

commit ff879b9a4c5fb6c52600a61a1bb43fdcb59cd6cf
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 00:35:38 2026 +0000

    feat: Add unsubscribe_partition to python bindings (#277)
---
 bindings/python/example/example.py | 13 +++++++++++++
 bindings/python/fluss/__init__.pyi |  8 ++++++++
 bindings/python/src/table.rs       | 17 +++++++++++++++++
 3 files changed, 38 insertions(+)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index d56879a..dd7f1b1 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -717,6 +717,19 @@ async def main():
         print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records 
from partitioned table:")
         print(partitioned_arrow.to_pandas())
 
+        # Demo: unsubscribe_partition - unsubscribe from one partition, read 
remaining
+        print("\n--- Testing unsubscribe_partition ---")
+        partitioned_scanner3 = await 
partitioned_table.new_scan().create_batch_scanner()
+        for p in partition_infos:
+            partitioned_scanner3.subscribe_partition(p.partition_id, 0, 
fluss.EARLIEST_OFFSET)
+        # Unsubscribe from the first partition
+        first_partition = partition_infos[0]
+        
partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0)
+        print(f"Unsubscribed from partition {first_partition.partition_name} 
(id={first_partition.partition_id})")
+        remaining_arrow = partitioned_scanner3.to_arrow()
+        print(f"After unsubscribe, to_arrow() returned 
{remaining_arrow.num_rows} records (from remaining partitions):")
+        print(remaining_arrow.to_pandas())
+
         # Demo: to_pandas() also works for partitioned tables
         print("\n--- Testing to_pandas() on partitioned table ---")
         partitioned_scanner2 = await 
partitioned_table.new_scan().create_batch_scanner()
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index a2bbaac..526dad7 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -420,6 +420,14 @@ class LogScanner:
             start_offset: The offset to start reading from (use 
EARLIEST_OFFSET for beginning)
         """
         ...
+    def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None:
+        """Unsubscribe from a specific partition bucket (partitioned tables 
only).
+
+        Args:
+            partition_id: The partition ID to unsubscribe from
+            bucket_id: The bucket ID within the partition
+        """
+        ...
     def poll(self, timeout_ms: int) -> List[ScanRecord]:
         """Poll for individual records with metadata.
 
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8af6b13..1a7dbdc 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -1657,6 +1657,23 @@ impl LogScanner {
         })
     }
 
+    /// Unsubscribe from a specific partition bucket (partitioned tables only).
+    ///
+    /// Args:
+    ///     partition_id: The partition ID to unsubscribe from
+    ///     bucket_id: The bucket ID within the partition
+    fn unsubscribe_partition(&self, py: Python, partition_id: i64, bucket_id: 
i32) -> PyResult<()> {
+        py.detach(|| {
+            TOKIO_RUNTIME.block_on(async {
+                with_scanner!(
+                    &self.scanner,
+                    unsubscribe_partition(partition_id, bucket_id)
+                )
+                .map_err(|e| FlussError::new_err(e.to_string()))
+            })
+        })
+    }
+
     /// Poll for individual records with metadata.
     ///
     /// Args:

Reply via email to