This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6de2cde  feat: Support drop_table, partitions and offsets methods in 
python bindings (#150)
6de2cde is described below

commit 6de2cde9ed9ea9f0ece9cc246af285587485f342
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Feb 5 23:27:55 2026 +0000

    feat: Support drop_table, partitions and offsets methods in python bindings 
(#150)
---
 bindings/python/example/example.py       | 202 ++++++++++++--
 bindings/python/fluss/__init__.pyi       | 127 ++++++++-
 bindings/python/src/admin.rs             | 253 ++++++++++++++++-
 bindings/python/src/lib.rs               |  22 ++
 bindings/python/src/table.rs             | 464 ++++++++++++++++++++++---------
 crates/fluss/src/client/table/scanner.rs |  20 ++
 6 files changed, 924 insertions(+), 164 deletions(-)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 9cb8f43..8735038 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -87,6 +87,19 @@ async def main():
     except Exception as e:
         print(f"Failed to get table info: {e}")
 
+    # Demo: List offsets
+    print("\n--- Testing list_offsets() ---")
+    try:
+        # Query latest offsets using OffsetType constant (recommended for type 
safety)
+        offsets = await admin.list_offsets(
+            table_path,
+            bucket_ids=[0],
+            offset_type=fluss.OffsetType.LATEST
+        )
+        print(f"Latest offsets for table (before writes): {offsets}")
+    except Exception as e:
+        print(f"Failed to list offsets: {e}")
+
     # Get the table instance
     table = await conn.get_table(table_path)
     print(f"Got table: {table}")
@@ -96,7 +109,7 @@ async def main():
     print(f"Created append writer: {append_writer}")
 
     try:
-        # Test 1: Write PyArrow Table
+        # Demo: Write PyArrow Table
         print("\n--- Testing PyArrow Table write ---")
         pa_table = pa.Table.from_arrays(
             [
@@ -139,7 +152,7 @@ async def main():
         append_writer.write_arrow(pa_table)
         print("Successfully wrote PyArrow Table")
 
-        # Test 2: Write PyArrow RecordBatch
+        # Demo: Write PyArrow RecordBatch
         print("\n--- Testing PyArrow RecordBatch write ---")
         pa_record_batch = pa.RecordBatch.from_arrays(
             [
@@ -202,7 +215,7 @@ async def main():
         )
         print("Successfully appended row (list with Date, Time, Timestamp, 
Decimal)")
 
-        # Test 4: Write Pandas DataFrame
+        # Demo: Write Pandas DataFrame
         print("\n--- Testing Pandas DataFrame write ---")
         df = pd.DataFrame(
             {
@@ -232,6 +245,19 @@ async def main():
         append_writer.flush()
         print("Successfully flushed data")
 
+        # Demo: Check offsets after writes
+        print("\n--- Checking offsets after writes ---")
+        try:
+            # Query with string constant (alternative API - both strings and 
constants are supported)
+            offsets = await admin.list_offsets(
+                table_path,
+                bucket_ids=[0],
+                offset_type="latest"  # Can also use "earliest" or "timestamp"
+            )
+            print(f"Latest offsets after writing 7 records: {offsets}")
+        except Exception as e:
+            print(f"Failed to list offsets: {e}")
+
     except Exception as e:
         print(f"Error during writing: {e}")
 
@@ -242,10 +268,13 @@ async def main():
         batch_scanner = await table.new_scan().create_batch_scanner()
         print(f"Created batch scanner: {batch_scanner}")
 
-        # Subscribe to scan from earliest to latest
-        # start_timestamp=None (earliest), end_timestamp=None (latest)
-        batch_scanner.subscribe(None, None)
+        # Subscribe to buckets (required before to_arrow/to_pandas)
+        # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET
+        num_buckets = (await admin.get_table(table_path)).num_buckets
+        batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+        print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET")
 
+        # Read all data using to_arrow()
         print("Scanning results using to_arrow():")
 
         # Try to get as PyArrow Table
@@ -255,13 +284,13 @@ async def main():
         except Exception as e:
             print(f"Could not convert to PyArrow: {e}")
 
-        # Let's subscribe from the beginning again.
-        # Reset subscription
-        batch_scanner.subscribe(None, None)
+        # Create a new batch scanner for to_pandas() test
+        batch_scanner2 = await table.new_scan().create_batch_scanner()
+        batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
 
         # Try to get as Pandas DataFrame
         try:
-            df_result = batch_scanner.to_pandas()
+            df_result = batch_scanner2.to_pandas()
             print(f"\nAs Pandas DataFrame:\n{df_result}")
         except Exception as e:
             print(f"Could not convert to Pandas: {e}")
@@ -273,13 +302,14 @@ async def main():
 
         # Test poll_arrow() method for incremental reading as Arrow Table
         print("\n--- Testing poll_arrow() method ---")
-        # Reset subscription to start from the beginning
-        batch_scanner.subscribe(None, None)
+        batch_scanner3 = await table.new_scan().create_batch_scanner()
+        batch_scanner3.subscribe(bucket_id=0, 
start_offset=fluss.EARLIEST_OFFSET)
+        print(f"Subscribed to bucket 0 at EARLIEST_OFFSET 
({fluss.EARLIEST_OFFSET})")
 
         # Poll with a timeout of 5000ms (5 seconds)
         # Note: poll_arrow() returns an empty table (not an error) on timeout
         try:
-            poll_result = batch_scanner.poll_arrow(5000)
+            poll_result = batch_scanner3.poll_arrow(5000)
             print(f"Number of rows: {poll_result.num_rows}")
 
             if poll_result.num_rows > 0:
@@ -287,7 +317,7 @@ async def main():
                 print(f"Polled data:\n{poll_df}")
             else:
                 print("Empty result (no records available)")
-                # Empty table still has schema
+                # Empty table still has schema - this is useful!
                 print(f"Schema: {poll_result.schema}")
 
         except Exception as e:
@@ -295,10 +325,11 @@ async def main():
 
         # Test poll_batches() method for batches with metadata
         print("\n--- Testing poll_batches() method ---")
-        batch_scanner.subscribe(None, None)
+        batch_scanner4 = await table.new_scan().create_batch_scanner()
+        batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
 
         try:
-            batches = batch_scanner.poll_batches(5000)
+            batches = batch_scanner4.poll_batches(5000)
             print(f"Number of batches: {len(batches)}")
 
             for i, batch in enumerate(batches):
@@ -319,7 +350,7 @@ async def main():
         record_scanner = await table.new_scan().create_log_scanner()
         print(f"Created record scanner: {record_scanner}")
 
-        record_scanner.subscribe(None, None)
+        record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
 
         # Poll returns List[ScanRecord] with per-record metadata
         print("\n--- Testing poll() method (record-by-record) ---")
@@ -539,10 +570,13 @@ async def main():
     # Demo: Column projection using builder pattern
     print("\n--- Testing Column Projection ---")
     try:
+        # Get bucket count for subscriptions
+        num_buckets = (await admin.get_table(table_path)).num_buckets
+
         # Project specific columns by index (using batch scanner for to_pandas)
         print("\n1. Projection by index [0, 1] (id, name):")
         scanner_index = await table.new_scan().project([0, 
1]).create_batch_scanner()
-        scanner_index.subscribe(None, None)
+        scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
         df_projected = scanner_index.to_pandas()
         print(df_projected.head())
         print(
@@ -554,7 +588,7 @@ async def main():
         scanner_names = await table.new_scan() \
             .project_by_name(["name", "score"]) \
             .create_batch_scanner()
-        scanner_names.subscribe(None, None)
+        scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
         df_named = scanner_names.to_pandas()
         print(df_named.head())
         print(f"   Projected {df_named.shape[1]} columns: 
{list(df_named.columns)}")
@@ -562,7 +596,7 @@ async def main():
         # Test empty result schema with projection
         print("\n3. Testing empty result schema with projection:")
         scanner_proj = await table.new_scan().project([0, 
2]).create_batch_scanner()
-        scanner_proj.subscribe(None, None)
+        scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
         # Quick poll that may return empty
         result = scanner_proj.poll_arrow(100)
         print(f"   Schema columns: {result.schema.names}")
@@ -570,6 +604,134 @@ async def main():
     except Exception as e:
         print(f"Error during projection: {e}")
 
+    # Demo: Drop tables
+    print("\n--- Testing drop_table() ---")
+    try:
+        # Drop the log table
+        await admin.drop_table(table_path, ignore_if_not_exists=True)
+        print(f"Successfully dropped table: {table_path}")
+        # Drop the PK table
+        await admin.drop_table(pk_table_path, ignore_if_not_exists=True)
+        print(f"Successfully dropped table: {pk_table_path}")
+    except Exception as e:
+        print(f"Failed to drop table: {e}")
+
+    # =====================================================
+    # Demo: Partitioned Table with list_partition_offsets
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned Table ---")
+    print("=" * 60)
+
+    # Create a partitioned log table
+    partitioned_fields = [
+        pa.field("id", pa.int32()),
+        pa.field("region", pa.string()),  # partition key
+        pa.field("value", pa.int64()),
+    ]
+    partitioned_schema = pa.schema(partitioned_fields)
+    fluss_partitioned_schema = fluss.Schema(partitioned_schema)
+
+    partitioned_table_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_schema,
+        partition_keys=["region"],  # Partition by region
+        bucket_count=1,
+    )
+
+    partitioned_table_path = fluss.TablePath("fluss", 
"partitioned_log_table_py")
+
+    try:
+        # Drop if exists first
+        await admin.drop_table(partitioned_table_path, 
ignore_if_not_exists=True)
+        print(f"Dropped existing table: {partitioned_table_path}")
+
+        # Create the partitioned table
+        await admin.create_table(partitioned_table_path, 
partitioned_table_descriptor, False)
+        print(f"Created partitioned table: {partitioned_table_path}")
+
+        # Create partitions for US and EU regions
+        print("\n--- Creating partitions ---")
+        await admin.create_partition(partitioned_table_path, {"region": "US"}, 
ignore_if_exists=True)
+        print("Created partition: region=US")
+        await admin.create_partition(partitioned_table_path, {"region": "EU"}, 
ignore_if_exists=True)
+        print("Created partition: region=EU")
+
+        # List partitions
+        print("\n--- Listing partitions ---")
+        partition_infos = await 
admin.list_partition_infos(partitioned_table_path)
+        for p in partition_infos:
+            print(f"  {p}")  # PartitionInfo(partition_id=..., 
partition_name='region=...')
+
+        # Get the table and write some data
+        partitioned_table = await conn.get_table(partitioned_table_path)
+        partitioned_writer = await partitioned_table.new_append_writer()
+
+        # Append data to US partition
+        await partitioned_writer.append({"id": 1, "region": "US", "value": 
100})
+        await partitioned_writer.append({"id": 2, "region": "US", "value": 
200})
+        # Append data to EU partition
+        await partitioned_writer.append({"id": 3, "region": "EU", "value": 
300})
+        await partitioned_writer.append({"id": 4, "region": "EU", "value": 
400})
+        partitioned_writer.flush()
+        print("\nWrote 4 records (2 to US, 2 to EU)")
+
+        # Demo: list_partition_offsets
+        print("\n--- Testing list_partition_offsets ---")
+
+        # Query offsets for US partition
+        # Note: partition_name is just the value (e.g., "US"), not "region=US"
+        us_offsets = await admin.list_partition_offsets(
+            partitioned_table_path,
+            partition_name="US",
+            bucket_ids=[0],
+            offset_type="latest"
+        )
+        print(f"US partition latest offsets: {us_offsets}")
+
+        # Query offsets for EU partition
+        eu_offsets = await admin.list_partition_offsets(
+            partitioned_table_path,
+            partition_name="EU",
+            bucket_ids=[0],
+            offset_type="latest"
+        )
+        print(f"EU partition latest offsets: {eu_offsets}")
+
+        # Demo: subscribe_partition for reading partitioned data
+        print("\n--- Testing subscribe_partition + to_arrow() ---")
+        partitioned_scanner = await 
partitioned_table.new_scan().create_batch_scanner()
+
+        # Subscribe to each partition using partition_id
+        for p in partition_infos:
+            partitioned_scanner.subscribe_partition(
+                partition_id=p.partition_id,
+                bucket_id=0,
+                start_offset=fluss.EARLIEST_OFFSET
+            )
+            print(f"Subscribed to partition {p.partition_name} 
(id={p.partition_id})")
+
+        # Use to_arrow() - now works for partitioned tables!
+        partitioned_arrow = partitioned_scanner.to_arrow()
+        print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records 
from partitioned table:")
+        print(partitioned_arrow.to_pandas())
+
+        # Demo: to_pandas() also works for partitioned tables
+        print("\n--- Testing to_pandas() on partitioned table ---")
+        partitioned_scanner2 = await 
partitioned_table.new_scan().create_batch_scanner()
+        for p in partition_infos:
+            partitioned_scanner2.subscribe_partition(p.partition_id, 0, 
fluss.EARLIEST_OFFSET)
+        partitioned_df = partitioned_scanner2.to_pandas()
+        print(f"to_pandas() returned {len(partitioned_df)} records:")
+        print(partitioned_df)
+
+        # Cleanup
+        await admin.drop_table(partitioned_table_path, 
ignore_if_not_exists=True)
+        print(f"\nDropped partitioned table: {partitioned_table_path}")
+
+    except Exception as e:
+        print(f"Error with partitioned table: {e}")
+        traceback.print_exc()
+
     # Close connection
     conn.close()
     print("\nConnection closed")
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 40d18f6..a2bbaac 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -129,6 +129,78 @@ class FlussAdmin:
     ) -> None: ...
     async def get_table(self, table_path: TablePath) -> TableInfo: ...
     async def get_latest_lake_snapshot(self, table_path: TablePath) -> 
LakeSnapshot: ...
+    async def drop_table(
+        self,
+        table_path: TablePath,
+        ignore_if_not_exists: bool = False,
+    ) -> None: ...
+    async def list_offsets(
+        self,
+        table_path: TablePath,
+        bucket_ids: List[int],
+        offset_type: str,
+        timestamp: Optional[int] = None,
+    ) -> Dict[int, int]:
+        """List offsets for the specified buckets.
+
+        Args:
+            table_path: Path to the table
+            bucket_ids: List of bucket IDs to query
+            offset_type: "earliest", "latest", or "timestamp"
+            timestamp: Required when offset_type is "timestamp"
+
+        Returns:
+            Dict mapping bucket_id -> offset
+        """
+        ...
+    async def list_partition_offsets(
+        self,
+        table_path: TablePath,
+        partition_name: str,
+        bucket_ids: List[int],
+        offset_type: str,
+        timestamp: Optional[int] = None,
+    ) -> Dict[int, int]:
+        """List offsets for buckets in a specific partition.
+
+        Args:
+            table_path: Path to the table
+            partition_name: Partition value (e.g., "US" not "region=US")
+            bucket_ids: List of bucket IDs to query
+            offset_type: "earliest", "latest", or "timestamp"
+            timestamp: Required when offset_type is "timestamp"
+
+        Returns:
+            Dict mapping bucket_id -> offset
+        """
+        ...
+    async def create_partition(
+        self,
+        table_path: TablePath,
+        partition_spec: Dict[str, str],
+        ignore_if_exists: bool = False,
+    ) -> None:
+        """Create a partition for a partitioned table.
+
+        Args:
+            table_path: Path to the table
+            partition_spec: Dict mapping partition column name to value (e.g., 
{"region": "US"})
+            ignore_if_exists: If True, don't raise error if partition already 
exists
+        """
+        ...
+    async def list_partition_infos(
+        self,
+        table_path: TablePath,
+    ) -> List["PartitionInfo"]:
+        """List all partitions for a partitioned table.
+
+        Args:
+            table_path: Path to the table
+
+        Returns:
+            List of PartitionInfo objects
+        """
+        ...
     def __repr__(self) -> str: ...
 
 class TableScan:
@@ -322,14 +394,30 @@ class LogScanner:
         scanner = await table.new_scan().project([0, 1]).create_log_scanner()
     """
 
-    def subscribe(
-        self, start_timestamp: Optional[int], end_timestamp: Optional[int]
+    def subscribe(self, bucket_id: int, start_offset: int) -> None:
+        """Subscribe to a single bucket at a specific offset (non-partitioned 
tables).
+
+        Args:
+            bucket_id: The bucket ID to subscribe to
+            start_offset: The offset to start reading from (use 
EARLIEST_OFFSET for beginning)
+        """
+        ...
+    def subscribe_buckets(self, bucket_offsets: Dict[int, int]) -> None:
+        """Subscribe to multiple buckets at specified offsets (non-partitioned 
tables).
+
+        Args:
+            bucket_offsets: Dict mapping bucket_id -> start_offset
+        """
+        ...
+    def subscribe_partition(
+        self, partition_id: int, bucket_id: int, start_offset: int
     ) -> None:
-        """Subscribe to log data with timestamp range.
+        """Subscribe to a bucket within a specific partition (partitioned 
tables only).
 
         Args:
-            start_timestamp: Not yet supported, must be None.
-            end_timestamp: Not yet supported, must be None.
+            partition_id: The partition ID (from PartitionInfo.partition_id)
+            bucket_id: The bucket ID within the partition
+            start_offset: The offset to start reading from (use 
EARLIEST_OFFSET for beginning)
         """
         ...
     def poll(self, timeout_ms: int) -> List[ScanRecord]:
@@ -384,12 +472,18 @@ class LogScanner:
         """Convert all data to Pandas DataFrame.
 
         Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
+        Reads from currently subscribed buckets until reaching their latest 
offsets.
+
+        You must call subscribe(), subscribe_buckets(), or 
subscribe_partition() first.
         """
         ...
     def to_arrow(self) -> pa.Table:
         """Convert all data to Arrow Table.
 
         Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
+        Reads from currently subscribed buckets until reaching their latest 
offsets.
+
+        You must call subscribe(), subscribe_buckets(), or 
subscribe_partition() first.
         """
         ...
     def __repr__(self) -> str: ...
@@ -493,4 +587,27 @@ class TableDistribution:
     def bucket_keys(self) -> List[str]: ...
     def bucket_count(self) -> Optional[int]: ...
 
+class PartitionInfo:
+    """Information about a partition."""
+
+    @property
+    def partition_id(self) -> int:
+        """Get the partition ID (globally unique in the cluster)."""
+        ...
+    @property
+    def partition_name(self) -> str:
+        """Get the partition name."""
+        ...
+    def __repr__(self) -> str: ...
+
+class OffsetType:
+    """Offset type constants for list_offsets()."""
+
+    EARLIEST: str
+    LATEST: str
+    TIMESTAMP: str
+
+# Constant for earliest offset (-2)
+EARLIEST_OFFSET: int
+
 __version__: str
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index fa189eb..d28c9c0 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::*;
+use fcore::rpc::message::OffsetSpec;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
 
@@ -25,6 +26,37 @@ pub struct FlussAdmin {
     __admin: Arc<fcore::client::FlussAdmin>,
 }
 
+/// Parse offset_type string into OffsetSpec
+fn parse_offset_spec(offset_type: &str, timestamp: Option<i64>) -> 
PyResult<OffsetSpec> {
+    match offset_type {
+        s if s.eq_ignore_ascii_case("earliest") => Ok(OffsetSpec::Earliest),
+        s if s.eq_ignore_ascii_case("latest") => Ok(OffsetSpec::Latest),
+        s if s.eq_ignore_ascii_case("timestamp") => {
+            let ts = timestamp.ok_or_else(|| {
+                FlussError::new_err("timestamp must be provided when 
offset_type='timestamp'")
+            })?;
+            Ok(OffsetSpec::Timestamp(ts))
+        }
+        _ => Err(FlussError::new_err(format!(
+            "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 
'timestamp'",
+            offset_type
+        ))),
+    }
+}
+
+/// Validate bucket IDs are non-negative
+fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
+    for &bucket_id in bucket_ids {
+        if bucket_id < 0 {
+            return Err(FlussError::new_err(format!(
+                "Invalid bucket_id: {}. Bucket IDs must be non-negative",
+                bucket_id
+            )));
+        }
+    }
+    Ok(())
+}
+
 #[pymethods]
 impl FlussAdmin {
     /// Create a table with the given schema
@@ -38,7 +70,7 @@ impl FlussAdmin {
     ) -> PyResult<Bound<'py, PyAny>> {
         let ignore = ignore_if_exists.unwrap_or(false);
 
-        let core_table_path = table_path.to_core().clone();
+        let core_table_path = table_path.to_core();
         let core_descriptor = table_descriptor.to_core().clone();
         let admin = self.__admin.clone();
 
@@ -58,7 +90,7 @@ impl FlussAdmin {
         py: Python<'py>,
         table_path: &TablePath,
     ) -> PyResult<Bound<'py, PyAny>> {
-        let core_table_path = table_path.to_core().clone();
+        let core_table_path = table_path.to_core();
         let admin = self.__admin.clone();
 
         future_into_py(py, async move {
@@ -80,7 +112,7 @@ impl FlussAdmin {
         py: Python<'py>,
         table_path: &TablePath,
     ) -> PyResult<Bound<'py, PyAny>> {
-        let core_table_path = table_path.to_core().clone();
+        let core_table_path = table_path.to_core();
         let admin = self.__admin.clone();
 
         future_into_py(py, async move {
@@ -96,6 +128,183 @@ impl FlussAdmin {
         })
     }
 
+    /// Drop a table
+    #[pyo3(signature = (table_path, ignore_if_not_exists=false))]
+    pub fn drop_table<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+        ignore_if_not_exists: bool,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+
+        future_into_py(py, async move {
+            admin
+                .drop_table(&core_table_path, ignore_if_not_exists)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to drop 
table: {e}")))?;
+
+            Python::attach(|py| Ok(py.None()))
+        })
+    }
+
+    /// List offsets for buckets (non-partitioned tables only).
+    ///
+    /// Args:
+    ///     table_path: Path to the table
+    ///     bucket_ids: List of bucket IDs to query
+    ///     offset_type: Type of offset to retrieve:
+    ///         - "earliest" or OffsetType.EARLIEST: Start of the log
+    ///         - "latest" or OffsetType.LATEST: End of the log
+    ///         - "timestamp" or OffsetType.TIMESTAMP: Offset at given 
timestamp (requires timestamp arg)
+    ///     timestamp: Required when offset_type is "timestamp", ignored 
otherwise
+    ///
+    /// Returns:
+    ///     dict[int, int]: Mapping of bucket_id -> offset
+    #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))]
+    pub fn list_offsets<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+        bucket_ids: Vec<i32>,
+        offset_type: &str,
+        timestamp: Option<i64>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        validate_bucket_ids(&bucket_ids)?;
+        let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+
+        future_into_py(py, async move {
+            let offsets = admin
+                .list_offsets(&core_table_path, &bucket_ids, offset_spec)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to list 
offsets: {e}")))?;
+
+            Python::attach(|py| {
+                let dict = pyo3::types::PyDict::new(py);
+                for (bucket_id, offset) in offsets {
+                    dict.set_item(bucket_id, offset)?;
+                }
+                Ok(dict.unbind())
+            })
+        })
+    }
+
+    /// List offsets for buckets in a specific partition of a partitioned 
table.
+    ///
+    /// Args:
+    ///     table_path: Path to the table
+    ///     partition_name: Partition value (e.g., "US" not "region=US")
+    ///     bucket_ids: List of bucket IDs to query
+    ///     offset_type: Type of offset to retrieve:
+    ///         - "earliest" or OffsetType.EARLIEST: Start of the log
+    ///         - "latest" or OffsetType.LATEST: End of the log
+    ///         - "timestamp" or OffsetType.TIMESTAMP: Offset at given 
timestamp (requires timestamp arg)
+    ///     timestamp: Required when offset_type is "timestamp", ignored 
otherwise
+    ///
+    /// Returns:
+    ///     dict[int, int]: Mapping of bucket_id -> offset
+    #[pyo3(signature = (table_path, partition_name, bucket_ids, offset_type, 
timestamp=None))]
+    pub fn list_partition_offsets<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+        partition_name: &str,
+        bucket_ids: Vec<i32>,
+        offset_type: &str,
+        timestamp: Option<i64>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        validate_bucket_ids(&bucket_ids)?;
+        let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+        let partition_name = partition_name.to_string();
+
+        future_into_py(py, async move {
+            let offsets = admin
+                .list_partition_offsets(&core_table_path, &partition_name, 
&bucket_ids, offset_spec)
+                .await
+                .map_err(|e| {
+                    FlussError::new_err(format!("Failed to list partition 
offsets: {e}"))
+                })?;
+
+            Python::attach(|py| {
+                let dict = pyo3::types::PyDict::new(py);
+                for (bucket_id, offset) in offsets {
+                    dict.set_item(bucket_id, offset)?;
+                }
+                Ok(dict.unbind())
+            })
+        })
+    }
+
+    /// Create a partition for a partitioned table.
+    ///
+    /// Args:
+    ///     table_path: Path to the table
+    ///     partition_spec: Dict mapping partition column name to value (e.g., 
{"region": "US"})
+    ///     ignore_if_exists: If True, don't raise error if partition already 
exists
+    ///
+    /// Returns:
+    ///     None
+    #[pyo3(signature = (table_path, partition_spec, ignore_if_exists=false))]
+    pub fn create_partition<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+        partition_spec: std::collections::HashMap<String, String>,
+        ignore_if_exists: bool,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+        let core_partition_spec = 
fcore::metadata::PartitionSpec::new(partition_spec);
+
+        future_into_py(py, async move {
+            admin
+                .create_partition(&core_table_path, &core_partition_spec, 
ignore_if_exists)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to create 
partition: {e}")))?;
+
+            Python::attach(|py| Ok(py.None()))
+        })
+    }
+
+    /// List all partitions for a partitioned table.
+    ///
+    /// Args:
+    ///     table_path: Path to the table
+    ///
+    /// Returns:
+    ///     List[PartitionInfo]: List of partition info objects
+    pub fn list_partition_infos<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+
+        future_into_py(py, async move {
+            let partition_infos = admin
+                .list_partition_infos(&core_table_path)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to list 
partitions: {e}")))?;
+
+            Python::attach(|py| {
+                let py_list = pyo3::types::PyList::empty(py);
+                for info in partition_infos {
+                    let py_info = PartitionInfo::from_core(info);
+                    py_list.append(Py::new(py, py_info)?)?;
+                }
+                Ok(py_list.unbind())
+            })
+        })
+    }
+
     fn __repr__(&self) -> String {
         "FlussAdmin()".to_string()
     }
@@ -109,3 +318,41 @@ impl FlussAdmin {
         }
     }
 }
+
+/// Information about a partition
+#[pyclass]
+pub struct PartitionInfo {
+    partition_id: i64,
+    partition_name: String,
+}
+
+#[pymethods]
+impl PartitionInfo {
+    /// Get the partition ID (globally unique in the cluster)
+    #[getter]
+    fn partition_id(&self) -> i64 {
+        self.partition_id
+    }
+
+    /// Get the partition name (e.g., "US" for a table partitioned by region)
+    #[getter]
+    fn partition_name(&self) -> &str {
+        &self.partition_name
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "PartitionInfo(partition_id={}, partition_name='{}')",
+            self.partition_id, self.partition_name
+        )
+    }
+}
+
+impl PartitionInfo {
+    pub fn from_core(info: fcore::metadata::PartitionInfo) -> Self {
+        Self {
+            partition_id: info.get_partition_id(),
+            partition_name: info.get_partition_name(),
+        }
+    }
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index ce063ab..ae7f6c5 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -48,6 +48,23 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
         .expect("Failed to create Tokio runtime")
 });
 
+/// Offset type constants for list_offsets()
+#[pyclass]
+#[derive(Clone)]
+pub struct OffsetType;
+
+#[pymethods]
+impl OffsetType {
+    #[classattr]
+    const EARLIEST: &'static str = "earliest";
+
+    #[classattr]
+    const LATEST: &'static str = "latest";
+
+    #[classattr]
+    const TIMESTAMP: &'static str = "timestamp";
+}
+
 #[pymodule]
 fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     // Register all classes
@@ -69,6 +86,11 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<ChangeType>()?;
     m.add_class::<ScanRecord>()?;
     m.add_class::<RecordBatch>()?;
+    m.add_class::<PartitionInfo>()?;
+    m.add_class::<OffsetType>()?;
+
+    // Register constants
+    m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
 
     // Register exception types
     m.add_class::<FlussError>()?;
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 30c7ce0..c285f25 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -20,7 +20,6 @@ use crate::*;
 use arrow::array::RecordBatch as ArrowRecordBatch;
 use arrow_pyarrow::{FromPyArrow, ToPyArrow};
 use arrow_schema::SchemaRef;
-use fluss::client::EARLIEST_OFFSET;
 use fluss::record::to_arrow_schema;
 use fluss::rpc::message::OffsetSpec;
 use pyo3::types::IntoPyDict;
@@ -1573,155 +1572,95 @@ pub struct LogScanner {
     projected_schema: SchemaRef,
     /// The projected row type to use for record-based scanning
     projected_row_type: fcore::metadata::RowType,
-    #[allow(dead_code)]
-    start_timestamp: Option<i64>,
-    #[allow(dead_code)]
-    end_timestamp: Option<i64>,
+    /// Cache for partition_id -> partition_name mapping (avoids repeated 
list_partition_infos calls)
+    partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>,
 }
 
 #[pymethods]
 impl LogScanner {
-    /// Subscribe to log data with timestamp range
-    fn subscribe(
-        &mut self,
-        _start_timestamp: Option<i64>,
-        _end_timestamp: Option<i64>,
-    ) -> PyResult<()> {
-        if _start_timestamp.is_some() {
-            return Err(FlussError::new_err(
-                "Specifying start_timestamp is not yet supported. Please use 
None.".to_string(),
-            ));
-        }
-        if _end_timestamp.is_some() {
-            return Err(FlussError::new_err(
-                "Specifying end_timestamp is not yet supported. Please use 
None.".to_string(),
-            ));
-        }
-
-        let num_buckets = self.table_info.get_num_buckets();
-        for bucket_id in 0..num_buckets {
-            let start_offset = EARLIEST_OFFSET;
-
-            // Subscribe to the appropriate scanner
-            if let Some(ref inner) = self.inner {
-                TOKIO_RUNTIME.block_on(async {
+    /// Subscribe to a single bucket at a specific offset (non-partitioned 
tables).
+    ///
+    /// Args:
+    ///     bucket_id: The bucket ID to subscribe to
+    ///     start_offset: The offset to start reading from (use 
EARLIEST_OFFSET for beginning)
+    fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) -> 
PyResult<()> {
+        py.detach(|| {
+            TOKIO_RUNTIME.block_on(async {
+                if let Some(ref inner) = self.inner {
                     inner
                         .subscribe(bucket_id, start_offset)
                         .await
-                        .map_err(|e| FlussError::new_err(e.to_string()))
-                })?;
-            } else if let Some(ref inner_batch) = self.inner_batch {
-                TOKIO_RUNTIME.block_on(async {
+                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe: {e}")))
+                } else if let Some(ref inner_batch) = self.inner_batch {
                     inner_batch
                         .subscribe(bucket_id, start_offset)
                         .await
-                        .map_err(|e| FlussError::new_err(e.to_string()))
-                })?;
-            } else {
-                return Err(FlussError::new_err("No scanner available"));
-            }
-        }
-
-        Ok(())
+                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe: {e}")))
+                } else {
+                    Err(FlussError::new_err("No scanner available"))
+                }
+            })
+        })
     }
 
-    /// Convert all data to Arrow Table
+    /// Subscribe to multiple buckets at specified offsets (non-partitioned 
tables).
     ///
-    /// Note: Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
-    fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
-        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
-            FlussError::new_err(
-                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
-                 that supports to_arrow().",
-            )
-        })?;
-
-        let mut all_batches = Vec::new();
-
-        let num_buckets = self.table_info.get_num_buckets();
-        let bucket_ids: Vec<i32> = (0..num_buckets).collect();
-
-        // todo: after supporting list_offsets with timestamp, we can use 
start_timestamp and end_timestamp here
-        let mut stopping_offsets: HashMap<i32, i64> = py
-            .detach(|| {
-                TOKIO_RUNTIME.block_on(async {
-                    self.admin
-                        .list_offsets(
-                            &self.table_info.table_path,
-                            bucket_ids.as_slice(),
-                            OffsetSpec::Latest,
-                        )
+    /// Args:
+    ///     bucket_offsets: A dict mapping bucket_id -> start_offset
+    fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap<i32, i64>) 
-> PyResult<()> {
+        py.detach(|| {
+            TOKIO_RUNTIME.block_on(async {
+                if let Some(ref inner) = self.inner {
+                    inner
+                        .subscribe_buckets(&bucket_offsets)
                         .await
-                })
-            })
-            .map_err(|e| FlussError::new_err(e.to_string()))?;
-
-        // Filter out buckets with no records to read (stop_at <= 0)
-        stopping_offsets.retain(|_, &mut v| v > 0);
-
-        while !stopping_offsets.is_empty() {
-            let scan_batches = py
-                .detach(|| {
-                    TOKIO_RUNTIME
-                        .block_on(async { 
inner_batch.poll(Duration::from_millis(500)).await })
-                })
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
-
-            if scan_batches.is_empty() {
-                continue;
-            }
-
-            for scan_batch in scan_batches {
-                let bucket_id = scan_batch.bucket().bucket_id();
-
-                // Check if this bucket is still being tracked; if not, ignore 
the batch
-                let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
-                    continue;
-                };
-
-                let base_offset = scan_batch.base_offset();
-                let last_offset = scan_batch.last_offset();
-
-                // If the batch starts at or after the stop_at offset, the 
bucket is exhausted
-                if base_offset >= stop_at {
-                    stopping_offsets.remove(&bucket_id);
-                    continue;
-                }
-
-                let batch = if last_offset >= stop_at {
-                    // This batch contains the target offset; slice it to keep 
only records
-                    // where offset < stop_at.
-                    let num_to_keep = (stop_at - base_offset) as usize;
-                    let b = scan_batch.into_batch();
-
-                    // Safety check: ensure we don't attempt to slice more 
rows than the batch contains
-                    let limit = num_to_keep.min(b.num_rows());
-                    b.slice(0, limit)
+                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe batch: {e}")))
+                } else if let Some(ref inner_batch) = self.inner_batch {
+                    inner_batch
+                        .subscribe_buckets(&bucket_offsets)
+                        .await
+                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe batch: {e}")))
                 } else {
-                    // The entire batch is within the desired range (all 
offsets < stop_at)
-                    scan_batch.into_batch()
-                };
-
-                all_batches.push(Arc::new(batch));
-
-                // If the batch's last offset reached or passed the inclusive 
limit (stop_at - 1),
-                // we are done with this bucket.
-                if last_offset >= stop_at - 1 {
-                    stopping_offsets.remove(&bucket_id);
+                    Err(FlussError::new_err("No scanner available"))
                 }
-            }
-        }
-
-        Utils::combine_batches_to_table(py, all_batches)
+            })
+        })
     }
 
-    /// Convert all data to Pandas DataFrame
-    fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
-        let arrow_table = self.to_arrow(py)?;
-
-        // Convert Arrow Table to Pandas DataFrame using pyarrow
-        let df = arrow_table.call_method0(py, "to_pandas")?;
-        Ok(df)
+    /// Subscribe to a bucket within a specific partition (partitioned tables 
only).
+    ///
+    /// Args:
+    ///     partition_id: The partition ID (from PartitionInfo.partition_id)
+    ///     bucket_id: The bucket ID within the partition
+    ///     start_offset: The offset to start reading from (use 
EARLIEST_OFFSET for beginning)
+    fn subscribe_partition(
+        &self,
+        py: Python,
+        partition_id: i64,
+        bucket_id: i32,
+        start_offset: i64,
+    ) -> PyResult<()> {
+        py.detach(|| {
+            TOKIO_RUNTIME.block_on(async {
+                if let Some(ref inner) = self.inner {
+                    inner
+                        .subscribe_partition(partition_id, bucket_id, 
start_offset)
+                        .await
+                        .map_err(|e| {
+                            FlussError::new_err(format!("Failed to subscribe 
partition: {e}"))
+                        })
+                } else if let Some(ref inner_batch) = self.inner_batch {
+                    inner_batch
+                        .subscribe_partition(partition_id, bucket_id, 
start_offset)
+                        .await
+                        .map_err(|e| {
+                            FlussError::new_err(format!("Failed to subscribe 
partition: {e}"))
+                        })
+                } else {
+                    Err(FlussError::new_err("No scanner available"))
+                }
+            })
+        })
     }
 
     /// Poll for individual records with metadata.
@@ -1873,6 +1812,54 @@ impl LogScanner {
         Ok(empty_table.into())
     }
 
+    /// Convert all data to Arrow Table.
+    ///
+    /// Reads from currently subscribed buckets until reaching their latest 
offsets.
+    /// Works for both partitioned and non-partitioned tables.
+    ///
+    /// You must call subscribe(), subscribe_buckets(), or 
subscribe_partition() first.
+    ///
+    /// Returns:
+    ///     PyArrow Table containing all data from subscribed buckets
+    fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
+        // 1. Get subscribed buckets from scanner (requires batch scanner for 
get_subscribed_buckets)
+        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+            FlussError::new_err(
+                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
+                 that supports to_arrow().",
+            )
+        })?;
+        let subscribed = inner_batch.get_subscribed_buckets();
+        if subscribed.is_empty() {
+            return Err(FlussError::new_err(
+                "No buckets subscribed. Call subscribe(), subscribe_buckets(), 
or subscribe_partition() first.",
+            ));
+        }
+
+        // 2. Query latest offsets for all subscribed buckets
+        let stopping_offsets = self.query_latest_offsets(py, &subscribed)?;
+
+        // 3. Poll until all buckets reach their stopping offsets
+        self.poll_until_offsets(py, stopping_offsets)
+    }
+
+    /// Convert all data to Pandas DataFrame.
+    ///
+    /// Reads from currently subscribed buckets until reaching their latest 
offsets.
+    /// Works for both partitioned and non-partitioned tables.
+    ///
+    /// You must call subscribe(), subscribe_buckets(), or 
subscribe_partition() first.
+    ///
+    /// Returns:
+    ///     Pandas DataFrame containing all data from subscribed buckets
+    fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
+        let arrow_table = self.to_arrow(py)?;
+
+        // Convert Arrow Table to Pandas DataFrame using pyarrow
+        let df = arrow_table.call_method0(py, "to_pandas")?;
+        Ok(df)
+    }
+
     fn __repr__(&self) -> String {
         format!("LogScanner(table={})", self.table_info.table_path)
     }
@@ -1894,8 +1881,7 @@ impl LogScanner {
             table_info,
             projected_schema,
             projected_row_type,
-            start_timestamp: None,
-            end_timestamp: None,
+            partition_name_cache: std::sync::RwLock::new(None),
         }
     }
 
@@ -1914,9 +1900,215 @@ impl LogScanner {
             table_info,
             projected_schema,
             projected_row_type,
-            start_timestamp: None,
-            end_timestamp: None,
+            partition_name_cache: std::sync::RwLock::new(None),
+        }
+    }
+
+    /// Get partition_id -> partition_name mapping, using cache if available
+    fn get_partition_name_map(
+        &self,
+        py: Python,
+        table_path: &fcore::metadata::TablePath,
+    ) -> PyResult<HashMap<i64, String>> {
+        // Check cache first (read lock)
+        {
+            let cache = self.partition_name_cache.read().unwrap();
+            if let Some(map) = cache.as_ref() {
+                return Ok(map.clone());
+            }
+        }
+
+        // Fetch partition infos (releases GIL during async call)
+        let partition_infos: Vec<fcore::metadata::PartitionInfo> = py
+            .detach(|| {
+                TOKIO_RUNTIME.block_on(async { 
self.admin.list_partition_infos(table_path).await })
+            })
+            .map_err(|e| FlussError::new_err(format!("Failed to list partition 
infos: {e}")))?;
+
+        // Build and cache the mapping
+        let map: HashMap<i64, String> = partition_infos
+            .into_iter()
+            .map(|info| (info.get_partition_id(), info.get_partition_name()))
+            .collect();
+
+        // Store in cache (write lock)
+        {
+            let mut cache = self.partition_name_cache.write().unwrap();
+            *cache = Some(map.clone());
+        }
+
+        Ok(map)
+    }
+
+    /// Query latest offsets for subscribed buckets (handles both partitioned 
and non-partitioned)
+    fn query_latest_offsets(
+        &self,
+        py: Python,
+        subscribed: &[(fcore::metadata::TableBucket, i64)],
+    ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
+        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+            FlussError::new_err("Batch-based scanner required for this 
operation")
+        })?;
+        let is_partitioned = inner_batch.is_partitioned();
+        let table_path = &self.table_info.table_path;
+
+        if !is_partitioned {
+            // Non-partitioned: simple case - just query all bucket IDs
+            let bucket_ids: Vec<i32> = subscribed.iter().map(|(tb, _)| 
tb.bucket_id()).collect();
+
+            let offsets: HashMap<i32, i64> = py
+                .detach(|| {
+                    TOKIO_RUNTIME.block_on(async {
+                        self.admin
+                            .list_offsets(table_path, &bucket_ids, 
OffsetSpec::Latest)
+                            .await
+                    })
+                })
+                .map_err(|e| FlussError::new_err(format!("Failed to list 
offsets: {e}")))?;
+
+            // Convert to TableBucket-keyed map
+            let table_id = self.table_info.table_id;
+            Ok(offsets
+                .into_iter()
+                .filter(|(_, offset)| *offset > 0)
+                .map(|(bucket_id, offset)| {
+                    (
+                        fcore::metadata::TableBucket::new(table_id, bucket_id),
+                        offset,
+                    )
+                })
+                .collect())
+        } else {
+            // Partitioned: need to query per partition
+            self.query_partitioned_offsets(py, subscribed)
+        }
+    }
+
+    /// Query offsets for partitioned table subscriptions
+    fn query_partitioned_offsets(
+        &self,
+        py: Python,
+        subscribed: &[(fcore::metadata::TableBucket, i64)],
+    ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
+        let table_path = &self.table_info.table_path;
+
+        // Get partition_id -> partition_name mapping (cached)
+        let partition_id_to_name = self.get_partition_name_map(py, 
table_path)?;
+
+        // Group subscribed buckets by partition_id
+        let mut by_partition: HashMap<i64, Vec<i32>> = HashMap::new();
+        for (tb, _) in subscribed {
+            if let Some(partition_id) = tb.partition_id() {
+                by_partition
+                    .entry(partition_id)
+                    .or_default()
+                    .push(tb.bucket_id());
+            }
+        }
+
+        // Query offsets for each partition
+        let mut result: HashMap<fcore::metadata::TableBucket, i64> = 
HashMap::new();
+        let table_id = self.table_info.table_id;
+
+        for (partition_id, bucket_ids) in by_partition {
+            let partition_name = 
partition_id_to_name.get(&partition_id).ok_or_else(|| {
+                FlussError::new_err(format!("Unknown partition_id: 
{partition_id}"))
+            })?;
+
+            let offsets: HashMap<i32, i64> = py
+                .detach(|| {
+                    TOKIO_RUNTIME.block_on(async {
+                        self.admin
+                            .list_partition_offsets(
+                                table_path,
+                                partition_name,
+                                &bucket_ids,
+                                OffsetSpec::Latest,
+                            )
+                            .await
+                    })
+                })
+                .map_err(|e| {
+                    FlussError::new_err(format!(
+                        "Failed to list offsets for partition 
{partition_name}: {e}"
+                    ))
+                })?;
+
+            for (bucket_id, offset) in offsets {
+                if offset > 0 {
+                    let tb = fcore::metadata::TableBucket::new_with_partition(
+                        table_id,
+                        Some(partition_id),
+                        bucket_id,
+                    );
+                    result.insert(tb, offset);
+                }
+            }
         }
+
+        Ok(result)
+    }
+
+    /// Poll until all buckets reach their stopping offsets
+    fn poll_until_offsets(
+        &self,
+        py: Python,
+        mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>,
+    ) -> PyResult<Py<PyAny>> {
+        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+            FlussError::new_err("Batch-based scanner required for this 
operation")
+        })?;
+        let mut all_batches = Vec::new();
+
+        while !stopping_offsets.is_empty() {
+            let scan_batches = py
+                .detach(|| {
+                    TOKIO_RUNTIME
+                        .block_on(async { 
inner_batch.poll(Duration::from_millis(500)).await })
+                })
+                .map_err(|e| FlussError::new_err(format!("Failed to poll: 
{e}")))?;
+
+            if scan_batches.is_empty() {
+                continue;
+            }
+
+            for scan_batch in scan_batches {
+                let table_bucket = scan_batch.bucket().clone();
+
+                // Check if this bucket is still being tracked
+                let Some(&stop_at) = stopping_offsets.get(&table_bucket) else {
+                    continue;
+                };
+
+                let base_offset = scan_batch.base_offset();
+                let last_offset = scan_batch.last_offset();
+
+                // If the batch starts at or after the stop_at offset, the 
bucket is exhausted
+                if base_offset >= stop_at {
+                    stopping_offsets.remove(&table_bucket);
+                    continue;
+                }
+
+                let batch = if last_offset >= stop_at {
+                    // Slice batch to keep only records where offset < stop_at
+                    let num_to_keep = (stop_at - base_offset) as usize;
+                    let b = scan_batch.into_batch();
+                    let limit = num_to_keep.min(b.num_rows());
+                    b.slice(0, limit)
+                } else {
+                    scan_batch.into_batch()
+                };
+
+                all_batches.push(Arc::new(batch));
+
+                // Check if we're done with this bucket
+                if last_offset >= stop_at - 1 {
+                    stopping_offsets.remove(&table_bucket);
+                }
+            }
+        }
+
+        Utils::combine_batches_to_table(py, all_batches)
     }
 }
 
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index ef68fb4..d50f19e 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -514,6 +514,16 @@ impl RecordBatchLogScanner {
             .subscribe_partition(partition_id, bucket, offset)
             .await
     }
+
+    /// Returns whether the table is partitioned
+    pub fn is_partitioned(&self) -> bool {
+        self.inner.is_partitioned_table
+    }
+
+    /// Returns all subscribed buckets with their current offsets
+    pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
+        self.inner.log_scanner_status.get_all_subscriptions()
+    }
 }
 
 struct LogFetcher {
@@ -1512,6 +1522,16 @@ impl LogScannerStatus {
         result
     }
 
+    /// Returns all subscribed buckets with their current offsets
+    pub fn get_all_subscriptions(&self) -> Vec<(TableBucket, i64)> {
+        let map = self.bucket_status_map.read();
+        let mut result = Vec::new();
+        map.for_each(|bucket, status| {
+            result.push((bucket.clone(), status.offset()));
+        });
+        result
+    }
+
     /// Helper to get bucket status
     fn get_status(&self, table_bucket: &TableBucket) -> 
Option<Arc<BucketScanStatus>> {
         let map = self.bucket_status_map.read();

Reply via email to