This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6de2cde feat: Support drop_table, partitions and offsets methods in
python bindings (#150)
6de2cde is described below
commit 6de2cde9ed9ea9f0ece9cc246af285587485f342
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Feb 5 23:27:55 2026 +0000
feat: Support drop_table, partitions and offsets methods in python bindings
(#150)
---
bindings/python/example/example.py | 202 ++++++++++++--
bindings/python/fluss/__init__.pyi | 127 ++++++++-
bindings/python/src/admin.rs | 253 ++++++++++++++++-
bindings/python/src/lib.rs | 22 ++
bindings/python/src/table.rs | 464 ++++++++++++++++++++++---------
crates/fluss/src/client/table/scanner.rs | 20 ++
6 files changed, 924 insertions(+), 164 deletions(-)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 9cb8f43..8735038 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -87,6 +87,19 @@ async def main():
except Exception as e:
print(f"Failed to get table info: {e}")
+ # Demo: List offsets
+ print("\n--- Testing list_offsets() ---")
+ try:
+ # Query latest offsets using OffsetType constant (recommended for type
safety)
+ offsets = await admin.list_offsets(
+ table_path,
+ bucket_ids=[0],
+ offset_type=fluss.OffsetType.LATEST
+ )
+ print(f"Latest offsets for table (before writes): {offsets}")
+ except Exception as e:
+ print(f"Failed to list offsets: {e}")
+
# Get the table instance
table = await conn.get_table(table_path)
print(f"Got table: {table}")
@@ -96,7 +109,7 @@ async def main():
print(f"Created append writer: {append_writer}")
try:
- # Test 1: Write PyArrow Table
+ # Demo: Write PyArrow Table
print("\n--- Testing PyArrow Table write ---")
pa_table = pa.Table.from_arrays(
[
@@ -139,7 +152,7 @@ async def main():
append_writer.write_arrow(pa_table)
print("Successfully wrote PyArrow Table")
- # Test 2: Write PyArrow RecordBatch
+ # Demo: Write PyArrow RecordBatch
print("\n--- Testing PyArrow RecordBatch write ---")
pa_record_batch = pa.RecordBatch.from_arrays(
[
@@ -202,7 +215,7 @@ async def main():
)
print("Successfully appended row (list with Date, Time, Timestamp,
Decimal)")
- # Test 4: Write Pandas DataFrame
+ # Demo: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
@@ -232,6 +245,19 @@ async def main():
append_writer.flush()
print("Successfully flushed data")
+ # Demo: Check offsets after writes
+ print("\n--- Checking offsets after writes ---")
+ try:
+ # Query with string constant (alternative API - both strings and
constants are supported)
+ offsets = await admin.list_offsets(
+ table_path,
+ bucket_ids=[0],
+ offset_type="latest" # Can also use "earliest" or "timestamp"
+ )
+ print(f"Latest offsets after writing 7 records: {offsets}")
+ except Exception as e:
+ print(f"Failed to list offsets: {e}")
+
except Exception as e:
print(f"Error during writing: {e}")
@@ -242,10 +268,13 @@ async def main():
batch_scanner = await table.new_scan().create_batch_scanner()
print(f"Created batch scanner: {batch_scanner}")
- # Subscribe to scan from earliest to latest
- # start_timestamp=None (earliest), end_timestamp=None (latest)
- batch_scanner.subscribe(None, None)
+ # Subscribe to buckets (required before to_arrow/to_pandas)
+ # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET
+ num_buckets = (await admin.get_table(table_path)).num_buckets
+ batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+ print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET")
+ # Read all data using to_arrow()
print("Scanning results using to_arrow():")
# Try to get as PyArrow Table
@@ -255,13 +284,13 @@ async def main():
except Exception as e:
print(f"Could not convert to PyArrow: {e}")
- # Let's subscribe from the beginning again.
- # Reset subscription
- batch_scanner.subscribe(None, None)
+ # Create a new batch scanner for to_pandas() test
+ batch_scanner2 = await table.new_scan().create_batch_scanner()
+ batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
# Try to get as Pandas DataFrame
try:
- df_result = batch_scanner.to_pandas()
+ df_result = batch_scanner2.to_pandas()
print(f"\nAs Pandas DataFrame:\n{df_result}")
except Exception as e:
print(f"Could not convert to Pandas: {e}")
@@ -273,13 +302,14 @@ async def main():
# Test poll_arrow() method for incremental reading as Arrow Table
print("\n--- Testing poll_arrow() method ---")
- # Reset subscription to start from the beginning
- batch_scanner.subscribe(None, None)
+ batch_scanner3 = await table.new_scan().create_batch_scanner()
+ batch_scanner3.subscribe(bucket_id=0,
start_offset=fluss.EARLIEST_OFFSET)
+ print(f"Subscribed to bucket 0 at EARLIEST_OFFSET
({fluss.EARLIEST_OFFSET})")
# Poll with a timeout of 5000ms (5 seconds)
# Note: poll_arrow() returns an empty table (not an error) on timeout
try:
- poll_result = batch_scanner.poll_arrow(5000)
+ poll_result = batch_scanner3.poll_arrow(5000)
print(f"Number of rows: {poll_result.num_rows}")
if poll_result.num_rows > 0:
@@ -287,7 +317,7 @@ async def main():
print(f"Polled data:\n{poll_df}")
else:
print("Empty result (no records available)")
- # Empty table still has schema
+ # Empty table still has schema - this is useful!
print(f"Schema: {poll_result.schema}")
except Exception as e:
@@ -295,10 +325,11 @@ async def main():
# Test poll_batches() method for batches with metadata
print("\n--- Testing poll_batches() method ---")
- batch_scanner.subscribe(None, None)
+ batch_scanner4 = await table.new_scan().create_batch_scanner()
+ batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
try:
- batches = batch_scanner.poll_batches(5000)
+ batches = batch_scanner4.poll_batches(5000)
print(f"Number of batches: {len(batches)}")
for i, batch in enumerate(batches):
@@ -319,7 +350,7 @@ async def main():
record_scanner = await table.new_scan().create_log_scanner()
print(f"Created record scanner: {record_scanner}")
- record_scanner.subscribe(None, None)
+ record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
# Poll returns List[ScanRecord] with per-record metadata
print("\n--- Testing poll() method (record-by-record) ---")
@@ -539,10 +570,13 @@ async def main():
# Demo: Column projection using builder pattern
print("\n--- Testing Column Projection ---")
try:
+ # Get bucket count for subscriptions
+ num_buckets = (await admin.get_table(table_path)).num_buckets
+
# Project specific columns by index (using batch scanner for to_pandas)
print("\n1. Projection by index [0, 1] (id, name):")
scanner_index = await table.new_scan().project([0,
1]).create_batch_scanner()
- scanner_index.subscribe(None, None)
+ scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
df_projected = scanner_index.to_pandas()
print(df_projected.head())
print(
@@ -554,7 +588,7 @@ async def main():
scanner_names = await table.new_scan() \
.project_by_name(["name", "score"]) \
.create_batch_scanner()
- scanner_names.subscribe(None, None)
+ scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
df_named = scanner_names.to_pandas()
print(df_named.head())
print(f" Projected {df_named.shape[1]} columns:
{list(df_named.columns)}")
@@ -562,7 +596,7 @@ async def main():
# Test empty result schema with projection
print("\n3. Testing empty result schema with projection:")
scanner_proj = await table.new_scan().project([0,
2]).create_batch_scanner()
- scanner_proj.subscribe(None, None)
+ scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
# Quick poll that may return empty
result = scanner_proj.poll_arrow(100)
print(f" Schema columns: {result.schema.names}")
@@ -570,6 +604,134 @@ async def main():
except Exception as e:
print(f"Error during projection: {e}")
+ # Demo: Drop tables
+ print("\n--- Testing drop_table() ---")
+ try:
+ # Drop the log table
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+ print(f"Successfully dropped table: {table_path}")
+ # Drop the PK table
+ await admin.drop_table(pk_table_path, ignore_if_not_exists=True)
+ print(f"Successfully dropped table: {pk_table_path}")
+ except Exception as e:
+ print(f"Failed to drop table: {e}")
+
+ # =====================================================
+ # Demo: Partitioned Table with list_partition_offsets
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned Table ---")
+ print("=" * 60)
+
+ # Create a partitioned log table
+ partitioned_fields = [
+ pa.field("id", pa.int32()),
+ pa.field("region", pa.string()), # partition key
+ pa.field("value", pa.int64()),
+ ]
+ partitioned_schema = pa.schema(partitioned_fields)
+ fluss_partitioned_schema = fluss.Schema(partitioned_schema)
+
+ partitioned_table_descriptor = fluss.TableDescriptor(
+ fluss_partitioned_schema,
+ partition_keys=["region"], # Partition by region
+ bucket_count=1,
+ )
+
+ partitioned_table_path = fluss.TablePath("fluss",
"partitioned_log_table_py")
+
+ try:
+ # Drop if exists first
+ await admin.drop_table(partitioned_table_path,
ignore_if_not_exists=True)
+ print(f"Dropped existing table: {partitioned_table_path}")
+
+ # Create the partitioned table
+ await admin.create_table(partitioned_table_path,
partitioned_table_descriptor, False)
+ print(f"Created partitioned table: {partitioned_table_path}")
+
+ # Create partitions for US and EU regions
+ print("\n--- Creating partitions ---")
+ await admin.create_partition(partitioned_table_path, {"region": "US"},
ignore_if_exists=True)
+ print("Created partition: region=US")
+ await admin.create_partition(partitioned_table_path, {"region": "EU"},
ignore_if_exists=True)
+ print("Created partition: region=EU")
+
+ # List partitions
+ print("\n--- Listing partitions ---")
+ partition_infos = await
admin.list_partition_infos(partitioned_table_path)
+ for p in partition_infos:
+ print(f" {p}") # PartitionInfo(partition_id=...,
partition_name='region=...')
+
+ # Get the table and write some data
+ partitioned_table = await conn.get_table(partitioned_table_path)
+ partitioned_writer = await partitioned_table.new_append_writer()
+
+ # Append data to US partition
+ await partitioned_writer.append({"id": 1, "region": "US", "value":
100})
+ await partitioned_writer.append({"id": 2, "region": "US", "value":
200})
+ # Append data to EU partition
+ await partitioned_writer.append({"id": 3, "region": "EU", "value":
300})
+ await partitioned_writer.append({"id": 4, "region": "EU", "value":
400})
+ partitioned_writer.flush()
+ print("\nWrote 4 records (2 to US, 2 to EU)")
+
+ # Demo: list_partition_offsets
+ print("\n--- Testing list_partition_offsets ---")
+
+ # Query offsets for US partition
+ # Note: partition_name is just the value (e.g., "US"), not "region=US"
+ us_offsets = await admin.list_partition_offsets(
+ partitioned_table_path,
+ partition_name="US",
+ bucket_ids=[0],
+ offset_type="latest"
+ )
+ print(f"US partition latest offsets: {us_offsets}")
+
+ # Query offsets for EU partition
+ eu_offsets = await admin.list_partition_offsets(
+ partitioned_table_path,
+ partition_name="EU",
+ bucket_ids=[0],
+ offset_type="latest"
+ )
+ print(f"EU partition latest offsets: {eu_offsets}")
+
+ # Demo: subscribe_partition for reading partitioned data
+ print("\n--- Testing subscribe_partition + to_arrow() ---")
+ partitioned_scanner = await
partitioned_table.new_scan().create_batch_scanner()
+
+ # Subscribe to each partition using partition_id
+ for p in partition_infos:
+ partitioned_scanner.subscribe_partition(
+ partition_id=p.partition_id,
+ bucket_id=0,
+ start_offset=fluss.EARLIEST_OFFSET
+ )
+ print(f"Subscribed to partition {p.partition_name}
(id={p.partition_id})")
+
+ # Use to_arrow() - now works for partitioned tables!
+ partitioned_arrow = partitioned_scanner.to_arrow()
+ print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records
from partitioned table:")
+ print(partitioned_arrow.to_pandas())
+
+ # Demo: to_pandas() also works for partitioned tables
+ print("\n--- Testing to_pandas() on partitioned table ---")
+ partitioned_scanner2 = await
partitioned_table.new_scan().create_batch_scanner()
+ for p in partition_infos:
+ partitioned_scanner2.subscribe_partition(p.partition_id, 0,
fluss.EARLIEST_OFFSET)
+ partitioned_df = partitioned_scanner2.to_pandas()
+ print(f"to_pandas() returned {len(partitioned_df)} records:")
+ print(partitioned_df)
+
+ # Cleanup
+ await admin.drop_table(partitioned_table_path,
ignore_if_not_exists=True)
+ print(f"\nDropped partitioned table: {partitioned_table_path}")
+
+ except Exception as e:
+ print(f"Error with partitioned table: {e}")
+ traceback.print_exc()
+
# Close connection
conn.close()
print("\nConnection closed")
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 40d18f6..a2bbaac 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -129,6 +129,78 @@ class FlussAdmin:
) -> None: ...
async def get_table(self, table_path: TablePath) -> TableInfo: ...
async def get_latest_lake_snapshot(self, table_path: TablePath) ->
LakeSnapshot: ...
+ async def drop_table(
+ self,
+ table_path: TablePath,
+ ignore_if_not_exists: bool = False,
+ ) -> None: ...
+ async def list_offsets(
+ self,
+ table_path: TablePath,
+ bucket_ids: List[int],
+ offset_type: str,
+ timestamp: Optional[int] = None,
+ ) -> Dict[int, int]:
+ """List offsets for the specified buckets.
+
+ Args:
+ table_path: Path to the table
+ bucket_ids: List of bucket IDs to query
+ offset_type: "earliest", "latest", or "timestamp"
+ timestamp: Required when offset_type is "timestamp"
+
+ Returns:
+ Dict mapping bucket_id -> offset
+ """
+ ...
+ async def list_partition_offsets(
+ self,
+ table_path: TablePath,
+ partition_name: str,
+ bucket_ids: List[int],
+ offset_type: str,
+ timestamp: Optional[int] = None,
+ ) -> Dict[int, int]:
+ """List offsets for buckets in a specific partition.
+
+ Args:
+ table_path: Path to the table
+ partition_name: Partition value (e.g., "US" not "region=US")
+ bucket_ids: List of bucket IDs to query
+ offset_type: "earliest", "latest", or "timestamp"
+ timestamp: Required when offset_type is "timestamp"
+
+ Returns:
+ Dict mapping bucket_id -> offset
+ """
+ ...
+ async def create_partition(
+ self,
+ table_path: TablePath,
+ partition_spec: Dict[str, str],
+ ignore_if_exists: bool = False,
+ ) -> None:
+ """Create a partition for a partitioned table.
+
+ Args:
+ table_path: Path to the table
+ partition_spec: Dict mapping partition column name to value (e.g.,
{"region": "US"})
+ ignore_if_exists: If True, don't raise error if partition already
exists
+ """
+ ...
+ async def list_partition_infos(
+ self,
+ table_path: TablePath,
+ ) -> List["PartitionInfo"]:
+ """List all partitions for a partitioned table.
+
+ Args:
+ table_path: Path to the table
+
+ Returns:
+ List of PartitionInfo objects
+ """
+ ...
def __repr__(self) -> str: ...
class TableScan:
@@ -322,14 +394,30 @@ class LogScanner:
scanner = await table.new_scan().project([0, 1]).create_log_scanner()
"""
- def subscribe(
- self, start_timestamp: Optional[int], end_timestamp: Optional[int]
+ def subscribe(self, bucket_id: int, start_offset: int) -> None:
+ """Subscribe to a single bucket at a specific offset (non-partitioned
tables).
+
+ Args:
+ bucket_id: The bucket ID to subscribe to
+ start_offset: The offset to start reading from (use
EARLIEST_OFFSET for beginning)
+ """
+ ...
+ def subscribe_buckets(self, bucket_offsets: Dict[int, int]) -> None:
+ """Subscribe to multiple buckets at specified offsets (non-partitioned
tables).
+
+ Args:
+ bucket_offsets: Dict mapping bucket_id -> start_offset
+ """
+ ...
+ def subscribe_partition(
+ self, partition_id: int, bucket_id: int, start_offset: int
) -> None:
- """Subscribe to log data with timestamp range.
+ """Subscribe to a bucket within a specific partition (partitioned
tables only).
Args:
- start_timestamp: Not yet supported, must be None.
- end_timestamp: Not yet supported, must be None.
+ partition_id: The partition ID (from PartitionInfo.partition_id)
+ bucket_id: The bucket ID within the partition
+ start_offset: The offset to start reading from (use
EARLIEST_OFFSET for beginning)
"""
...
def poll(self, timeout_ms: int) -> List[ScanRecord]:
@@ -384,12 +472,18 @@ class LogScanner:
"""Convert all data to Pandas DataFrame.
Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
+ Reads from currently subscribed buckets until reaching their latest
offsets.
+
+ You must call subscribe(), subscribe_buckets(), or
subscribe_partition() first.
"""
...
def to_arrow(self) -> pa.Table:
"""Convert all data to Arrow Table.
Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
+ Reads from currently subscribed buckets until reaching their latest
offsets.
+
+ You must call subscribe(), subscribe_buckets(), or
subscribe_partition() first.
"""
...
def __repr__(self) -> str: ...
@@ -493,4 +587,27 @@ class TableDistribution:
def bucket_keys(self) -> List[str]: ...
def bucket_count(self) -> Optional[int]: ...
+class PartitionInfo:
+ """Information about a partition."""
+
+ @property
+ def partition_id(self) -> int:
+ """Get the partition ID (globally unique in the cluster)."""
+ ...
+ @property
+ def partition_name(self) -> str:
+ """Get the partition name."""
+ ...
+ def __repr__(self) -> str: ...
+
+class OffsetType:
+ """Offset type constants for list_offsets()."""
+
+ EARLIEST: str
+ LATEST: str
+ TIMESTAMP: str
+
+# Constant for earliest offset (-2)
+EARLIEST_OFFSET: int
+
__version__: str
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index fa189eb..d28c9c0 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::*;
+use fcore::rpc::message::OffsetSpec;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
@@ -25,6 +26,37 @@ pub struct FlussAdmin {
__admin: Arc<fcore::client::FlussAdmin>,
}
+/// Parse offset_type string into OffsetSpec
+fn parse_offset_spec(offset_type: &str, timestamp: Option<i64>) ->
PyResult<OffsetSpec> {
+ match offset_type {
+ s if s.eq_ignore_ascii_case("earliest") => Ok(OffsetSpec::Earliest),
+ s if s.eq_ignore_ascii_case("latest") => Ok(OffsetSpec::Latest),
+ s if s.eq_ignore_ascii_case("timestamp") => {
+ let ts = timestamp.ok_or_else(|| {
+ FlussError::new_err("timestamp must be provided when
offset_type='timestamp'")
+ })?;
+ Ok(OffsetSpec::Timestamp(ts))
+ }
+ _ => Err(FlussError::new_err(format!(
+ "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or
'timestamp'",
+ offset_type
+ ))),
+ }
+}
+
+/// Validate bucket IDs are non-negative
+fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
+ for &bucket_id in bucket_ids {
+ if bucket_id < 0 {
+ return Err(FlussError::new_err(format!(
+ "Invalid bucket_id: {}. Bucket IDs must be non-negative",
+ bucket_id
+ )));
+ }
+ }
+ Ok(())
+}
+
#[pymethods]
impl FlussAdmin {
/// Create a table with the given schema
@@ -38,7 +70,7 @@ impl FlussAdmin {
) -> PyResult<Bound<'py, PyAny>> {
let ignore = ignore_if_exists.unwrap_or(false);
- let core_table_path = table_path.to_core().clone();
+ let core_table_path = table_path.to_core();
let core_descriptor = table_descriptor.to_core().clone();
let admin = self.__admin.clone();
@@ -58,7 +90,7 @@ impl FlussAdmin {
py: Python<'py>,
table_path: &TablePath,
) -> PyResult<Bound<'py, PyAny>> {
- let core_table_path = table_path.to_core().clone();
+ let core_table_path = table_path.to_core();
let admin = self.__admin.clone();
future_into_py(py, async move {
@@ -80,7 +112,7 @@ impl FlussAdmin {
py: Python<'py>,
table_path: &TablePath,
) -> PyResult<Bound<'py, PyAny>> {
- let core_table_path = table_path.to_core().clone();
+ let core_table_path = table_path.to_core();
let admin = self.__admin.clone();
future_into_py(py, async move {
@@ -96,6 +128,183 @@ impl FlussAdmin {
})
}
+ /// Drop a table
+ #[pyo3(signature = (table_path, ignore_if_not_exists=false))]
+ pub fn drop_table<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ ignore_if_not_exists: bool,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ admin
+ .drop_table(&core_table_path, ignore_if_not_exists)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to drop
table: {e}")))?;
+
+ Python::attach(|py| Ok(py.None()))
+ })
+ }
+
+ /// List offsets for buckets (non-partitioned tables only).
+ ///
+ /// Args:
+ /// table_path: Path to the table
+ /// bucket_ids: List of bucket IDs to query
+ /// offset_type: Type of offset to retrieve:
+ /// - "earliest" or OffsetType.EARLIEST: Start of the log
+ /// - "latest" or OffsetType.LATEST: End of the log
+ /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given
timestamp (requires timestamp arg)
+ /// timestamp: Required when offset_type is "timestamp", ignored
otherwise
+ ///
+ /// Returns:
+ /// dict[int, int]: Mapping of bucket_id -> offset
+ #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))]
+ pub fn list_offsets<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ bucket_ids: Vec<i32>,
+ offset_type: &str,
+ timestamp: Option<i64>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ validate_bucket_ids(&bucket_ids)?;
+ let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ let offsets = admin
+ .list_offsets(&core_table_path, &bucket_ids, offset_spec)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to list
offsets: {e}")))?;
+
+ Python::attach(|py| {
+ let dict = pyo3::types::PyDict::new(py);
+ for (bucket_id, offset) in offsets {
+ dict.set_item(bucket_id, offset)?;
+ }
+ Ok(dict.unbind())
+ })
+ })
+ }
+
+ /// List offsets for buckets in a specific partition of a partitioned
table.
+ ///
+ /// Args:
+ /// table_path: Path to the table
+ /// partition_name: Partition value (e.g., "US" not "region=US")
+ /// bucket_ids: List of bucket IDs to query
+ /// offset_type: Type of offset to retrieve:
+ /// - "earliest" or OffsetType.EARLIEST: Start of the log
+ /// - "latest" or OffsetType.LATEST: End of the log
+ /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given
timestamp (requires timestamp arg)
+ /// timestamp: Required when offset_type is "timestamp", ignored
otherwise
+ ///
+ /// Returns:
+ /// dict[int, int]: Mapping of bucket_id -> offset
+ #[pyo3(signature = (table_path, partition_name, bucket_ids, offset_type,
timestamp=None))]
+ pub fn list_partition_offsets<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ partition_name: &str,
+ bucket_ids: Vec<i32>,
+ offset_type: &str,
+ timestamp: Option<i64>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ validate_bucket_ids(&bucket_ids)?;
+ let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+ let partition_name = partition_name.to_string();
+
+ future_into_py(py, async move {
+ let offsets = admin
+ .list_partition_offsets(&core_table_path, &partition_name,
&bucket_ids, offset_spec)
+ .await
+ .map_err(|e| {
+ FlussError::new_err(format!("Failed to list partition
offsets: {e}"))
+ })?;
+
+ Python::attach(|py| {
+ let dict = pyo3::types::PyDict::new(py);
+ for (bucket_id, offset) in offsets {
+ dict.set_item(bucket_id, offset)?;
+ }
+ Ok(dict.unbind())
+ })
+ })
+ }
+
+ /// Create a partition for a partitioned table.
+ ///
+ /// Args:
+ /// table_path: Path to the table
+ /// partition_spec: Dict mapping partition column name to value (e.g.,
{"region": "US"})
+ /// ignore_if_exists: If True, don't raise error if partition already
exists
+ ///
+ /// Returns:
+ /// None
+ #[pyo3(signature = (table_path, partition_spec, ignore_if_exists=false))]
+ pub fn create_partition<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ partition_spec: std::collections::HashMap<String, String>,
+ ignore_if_exists: bool,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+ let core_partition_spec =
fcore::metadata::PartitionSpec::new(partition_spec);
+
+ future_into_py(py, async move {
+ admin
+ .create_partition(&core_table_path, &core_partition_spec,
ignore_if_exists)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to create
partition: {e}")))?;
+
+ Python::attach(|py| Ok(py.None()))
+ })
+ }
+
+ /// List all partitions for a partitioned table.
+ ///
+ /// Args:
+ /// table_path: Path to the table
+ ///
+ /// Returns:
+ /// List[PartitionInfo]: List of partition info objects
+ pub fn list_partition_infos<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ let partition_infos = admin
+ .list_partition_infos(&core_table_path)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to list
partitions: {e}")))?;
+
+ Python::attach(|py| {
+ let py_list = pyo3::types::PyList::empty(py);
+ for info in partition_infos {
+ let py_info = PartitionInfo::from_core(info);
+ py_list.append(Py::new(py, py_info)?)?;
+ }
+ Ok(py_list.unbind())
+ })
+ })
+ }
+
fn __repr__(&self) -> String {
"FlussAdmin()".to_string()
}
@@ -109,3 +318,41 @@ impl FlussAdmin {
}
}
}
+
+/// Information about a partition
+#[pyclass]
+pub struct PartitionInfo {
+ partition_id: i64,
+ partition_name: String,
+}
+
+#[pymethods]
+impl PartitionInfo {
+ /// Get the partition ID (globally unique in the cluster)
+ #[getter]
+ fn partition_id(&self) -> i64 {
+ self.partition_id
+ }
+
+ /// Get the partition name (e.g., "US" for a table partitioned by region)
+ #[getter]
+ fn partition_name(&self) -> &str {
+ &self.partition_name
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "PartitionInfo(partition_id={}, partition_name='{}')",
+ self.partition_id, self.partition_name
+ )
+ }
+}
+
+impl PartitionInfo {
+ pub fn from_core(info: fcore::metadata::PartitionInfo) -> Self {
+ Self {
+ partition_id: info.get_partition_id(),
+ partition_name: info.get_partition_name(),
+ }
+ }
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index ce063ab..ae7f6c5 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -48,6 +48,23 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
.expect("Failed to create Tokio runtime")
});
+/// Offset type constants for list_offsets()
+#[pyclass]
+#[derive(Clone)]
+pub struct OffsetType;
+
+#[pymethods]
+impl OffsetType {
+ #[classattr]
+ const EARLIEST: &'static str = "earliest";
+
+ #[classattr]
+ const LATEST: &'static str = "latest";
+
+ #[classattr]
+ const TIMESTAMP: &'static str = "timestamp";
+}
+
#[pymodule]
fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Register all classes
@@ -69,6 +86,11 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<ChangeType>()?;
m.add_class::<ScanRecord>()?;
m.add_class::<RecordBatch>()?;
+ m.add_class::<PartitionInfo>()?;
+ m.add_class::<OffsetType>()?;
+
+ // Register constants
+ m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
// Register exception types
m.add_class::<FlussError>()?;
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 30c7ce0..c285f25 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -20,7 +20,6 @@ use crate::*;
use arrow::array::RecordBatch as ArrowRecordBatch;
use arrow_pyarrow::{FromPyArrow, ToPyArrow};
use arrow_schema::SchemaRef;
-use fluss::client::EARLIEST_OFFSET;
use fluss::record::to_arrow_schema;
use fluss::rpc::message::OffsetSpec;
use pyo3::types::IntoPyDict;
@@ -1573,155 +1572,95 @@ pub struct LogScanner {
projected_schema: SchemaRef,
/// The projected row type to use for record-based scanning
projected_row_type: fcore::metadata::RowType,
- #[allow(dead_code)]
- start_timestamp: Option<i64>,
- #[allow(dead_code)]
- end_timestamp: Option<i64>,
+ /// Cache for partition_id -> partition_name mapping (avoids repeated
list_partition_infos calls)
+ partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>,
}
#[pymethods]
impl LogScanner {
- /// Subscribe to log data with timestamp range
- fn subscribe(
- &mut self,
- _start_timestamp: Option<i64>,
- _end_timestamp: Option<i64>,
- ) -> PyResult<()> {
- if _start_timestamp.is_some() {
- return Err(FlussError::new_err(
- "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
- ));
- }
- if _end_timestamp.is_some() {
- return Err(FlussError::new_err(
- "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
- ));
- }
-
- let num_buckets = self.table_info.get_num_buckets();
- for bucket_id in 0..num_buckets {
- let start_offset = EARLIEST_OFFSET;
-
- // Subscribe to the appropriate scanner
- if let Some(ref inner) = self.inner {
- TOKIO_RUNTIME.block_on(async {
+ /// Subscribe to a single bucket at a specific offset (non-partitioned
tables).
+ ///
+ /// Args:
+ /// bucket_id: The bucket ID to subscribe to
+ /// start_offset: The offset to start reading from (use
EARLIEST_OFFSET for beginning)
+ fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) ->
PyResult<()> {
+ py.detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ if let Some(ref inner) = self.inner {
inner
.subscribe(bucket_id, start_offset)
.await
- .map_err(|e| FlussError::new_err(e.to_string()))
- })?;
- } else if let Some(ref inner_batch) = self.inner_batch {
- TOKIO_RUNTIME.block_on(async {
+ .map_err(|e| FlussError::new_err(format!("Failed to
subscribe: {e}")))
+ } else if let Some(ref inner_batch) = self.inner_batch {
inner_batch
.subscribe(bucket_id, start_offset)
.await
- .map_err(|e| FlussError::new_err(e.to_string()))
- })?;
- } else {
- return Err(FlussError::new_err("No scanner available"));
- }
- }
-
- Ok(())
+ .map_err(|e| FlussError::new_err(format!("Failed to
subscribe: {e}")))
+ } else {
+ Err(FlussError::new_err("No scanner available"))
+ }
+ })
+ })
}
- /// Convert all data to Arrow Table
+ /// Subscribe to multiple buckets at specified offsets (non-partitioned
tables).
///
- /// Note: Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
- fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
- let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
- FlussError::new_err(
- "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
- that supports to_arrow().",
- )
- })?;
-
- let mut all_batches = Vec::new();
-
- let num_buckets = self.table_info.get_num_buckets();
- let bucket_ids: Vec<i32> = (0..num_buckets).collect();
-
- // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
- let mut stopping_offsets: HashMap<i32, i64> = py
- .detach(|| {
- TOKIO_RUNTIME.block_on(async {
- self.admin
- .list_offsets(
- &self.table_info.table_path,
- bucket_ids.as_slice(),
- OffsetSpec::Latest,
- )
+ /// Args:
+ /// bucket_offsets: A dict mapping bucket_id -> start_offset
+ fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap<i32, i64>)
-> PyResult<()> {
+ py.detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ if let Some(ref inner) = self.inner {
+ inner
+ .subscribe_buckets(&bucket_offsets)
.await
- })
- })
- .map_err(|e| FlussError::new_err(e.to_string()))?;
-
- // Filter out buckets with no records to read (stop_at <= 0)
- stopping_offsets.retain(|_, &mut v| v > 0);
-
- while !stopping_offsets.is_empty() {
- let scan_batches = py
- .detach(|| {
- TOKIO_RUNTIME
- .block_on(async {
inner_batch.poll(Duration::from_millis(500)).await })
- })
- .map_err(|e| FlussError::new_err(e.to_string()))?;
-
- if scan_batches.is_empty() {
- continue;
- }
-
- for scan_batch in scan_batches {
- let bucket_id = scan_batch.bucket().bucket_id();
-
- // Check if this bucket is still being tracked; if not, ignore
the batch
- let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
- continue;
- };
-
- let base_offset = scan_batch.base_offset();
- let last_offset = scan_batch.last_offset();
-
- // If the batch starts at or after the stop_at offset, the
bucket is exhausted
- if base_offset >= stop_at {
- stopping_offsets.remove(&bucket_id);
- continue;
- }
-
- let batch = if last_offset >= stop_at {
- // This batch contains the target offset; slice it to keep
only records
- // where offset < stop_at.
- let num_to_keep = (stop_at - base_offset) as usize;
- let b = scan_batch.into_batch();
-
- // Safety check: ensure we don't attempt to slice more
rows than the batch contains
- let limit = num_to_keep.min(b.num_rows());
- b.slice(0, limit)
+ .map_err(|e| FlussError::new_err(format!("Failed to
subscribe batch: {e}")))
+ } else if let Some(ref inner_batch) = self.inner_batch {
+ inner_batch
+ .subscribe_buckets(&bucket_offsets)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to
subscribe batch: {e}")))
} else {
- // The entire batch is within the desired range (all
offsets < stop_at)
- scan_batch.into_batch()
- };
-
- all_batches.push(Arc::new(batch));
-
- // If the batch's last offset reached or passed the inclusive
limit (stop_at - 1),
- // we are done with this bucket.
- if last_offset >= stop_at - 1 {
- stopping_offsets.remove(&bucket_id);
+ Err(FlussError::new_err("No scanner available"))
}
- }
- }
-
- Utils::combine_batches_to_table(py, all_batches)
+ })
+ })
}
- /// Convert all data to Pandas DataFrame
- fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
- let arrow_table = self.to_arrow(py)?;
-
- // Convert Arrow Table to Pandas DataFrame using pyarrow
- let df = arrow_table.call_method0(py, "to_pandas")?;
- Ok(df)
+ /// Subscribe to a bucket within a specific partition (partitioned tables
only).
+ ///
+ /// Args:
+ /// partition_id: The partition ID (from PartitionInfo.partition_id)
+ /// bucket_id: The bucket ID within the partition
+ /// start_offset: The offset to start reading from (use
EARLIEST_OFFSET for beginning)
+ fn subscribe_partition(
+ &self,
+ py: Python,
+ partition_id: i64,
+ bucket_id: i32,
+ start_offset: i64,
+ ) -> PyResult<()> {
+ py.detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ if let Some(ref inner) = self.inner {
+ inner
+ .subscribe_partition(partition_id, bucket_id,
start_offset)
+ .await
+ .map_err(|e| {
+ FlussError::new_err(format!("Failed to subscribe
partition: {e}"))
+ })
+ } else if let Some(ref inner_batch) = self.inner_batch {
+ inner_batch
+ .subscribe_partition(partition_id, bucket_id,
start_offset)
+ .await
+ .map_err(|e| {
+ FlussError::new_err(format!("Failed to subscribe
partition: {e}"))
+ })
+ } else {
+ Err(FlussError::new_err("No scanner available"))
+ }
+ })
+ })
}
/// Poll for individual records with metadata.
@@ -1873,6 +1812,54 @@ impl LogScanner {
Ok(empty_table.into())
}
+ /// Convert all data to Arrow Table.
+ ///
+ /// Reads from currently subscribed buckets until reaching their latest
offsets.
+ /// Works for both partitioned and non-partitioned tables.
+ ///
+ /// You must call subscribe(), subscribe_buckets(), or
subscribe_partition() first.
+ ///
+ /// Returns:
+ /// PyArrow Table containing all data from subscribed buckets
+ fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
+ // 1. Get subscribed buckets from scanner (requires batch scanner for
get_subscribed_buckets)
+ let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+ FlussError::new_err(
+ "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
+ that supports to_arrow().",
+ )
+ })?;
+ let subscribed = inner_batch.get_subscribed_buckets();
+ if subscribed.is_empty() {
+ return Err(FlussError::new_err(
+ "No buckets subscribed. Call subscribe(), subscribe_buckets(),
or subscribe_partition() first.",
+ ));
+ }
+
+ // 2. Query latest offsets for all subscribed buckets
+ let stopping_offsets = self.query_latest_offsets(py, &subscribed)?;
+
+ // 3. Poll until all buckets reach their stopping offsets
+ self.poll_until_offsets(py, stopping_offsets)
+ }
+
+ /// Convert all data to Pandas DataFrame.
+ ///
+ /// Reads from currently subscribed buckets until reaching their latest
offsets.
+ /// Works for both partitioned and non-partitioned tables.
+ ///
+ /// You must call subscribe(), subscribe_buckets(), or
subscribe_partition() first.
+ ///
+ /// Returns:
+ /// Pandas DataFrame containing all data from subscribed buckets
+ fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
+ let arrow_table = self.to_arrow(py)?;
+
+ // Convert Arrow Table to Pandas DataFrame using pyarrow
+ let df = arrow_table.call_method0(py, "to_pandas")?;
+ Ok(df)
+ }
+
fn __repr__(&self) -> String {
format!("LogScanner(table={})", self.table_info.table_path)
}
@@ -1894,8 +1881,7 @@ impl LogScanner {
table_info,
projected_schema,
projected_row_type,
- start_timestamp: None,
- end_timestamp: None,
+ partition_name_cache: std::sync::RwLock::new(None),
}
}
@@ -1914,9 +1900,215 @@ impl LogScanner {
table_info,
projected_schema,
projected_row_type,
- start_timestamp: None,
- end_timestamp: None,
+ partition_name_cache: std::sync::RwLock::new(None),
+ }
+ }
+
+ /// Get partition_id -> partition_name mapping, using cache if available
+ fn get_partition_name_map(
+ &self,
+ py: Python,
+ table_path: &fcore::metadata::TablePath,
+ ) -> PyResult<HashMap<i64, String>> {
+ // Check cache first (read lock)
+ {
+ let cache = self.partition_name_cache.read().unwrap();
+ if let Some(map) = cache.as_ref() {
+ return Ok(map.clone());
+ }
+ }
+
+ // Fetch partition infos (releases GIL during async call)
+ let partition_infos: Vec<fcore::metadata::PartitionInfo> = py
+ .detach(|| {
+ TOKIO_RUNTIME.block_on(async {
self.admin.list_partition_infos(table_path).await })
+ })
+ .map_err(|e| FlussError::new_err(format!("Failed to list partition
infos: {e}")))?;
+
+ // Build and cache the mapping
+ let map: HashMap<i64, String> = partition_infos
+ .into_iter()
+ .map(|info| (info.get_partition_id(), info.get_partition_name()))
+ .collect();
+
+ // Store in cache (write lock)
+ {
+ let mut cache = self.partition_name_cache.write().unwrap();
+ *cache = Some(map.clone());
+ }
+
+ Ok(map)
+ }
+
+ /// Query latest offsets for subscribed buckets (handles both partitioned
and non-partitioned)
+ fn query_latest_offsets(
+ &self,
+ py: Python,
+ subscribed: &[(fcore::metadata::TableBucket, i64)],
+ ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
+ let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+ FlussError::new_err("Batch-based scanner required for this
operation")
+ })?;
+ let is_partitioned = inner_batch.is_partitioned();
+ let table_path = &self.table_info.table_path;
+
+ if !is_partitioned {
+ // Non-partitioned: simple case - just query all bucket IDs
+ let bucket_ids: Vec<i32> = subscribed.iter().map(|(tb, _)|
tb.bucket_id()).collect();
+
+ let offsets: HashMap<i32, i64> = py
+ .detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ self.admin
+ .list_offsets(table_path, &bucket_ids,
OffsetSpec::Latest)
+ .await
+ })
+ })
+ .map_err(|e| FlussError::new_err(format!("Failed to list
offsets: {e}")))?;
+
+ // Convert to TableBucket-keyed map
+ let table_id = self.table_info.table_id;
+ Ok(offsets
+ .into_iter()
+ .filter(|(_, offset)| *offset > 0)
+ .map(|(bucket_id, offset)| {
+ (
+ fcore::metadata::TableBucket::new(table_id, bucket_id),
+ offset,
+ )
+ })
+ .collect())
+ } else {
+ // Partitioned: need to query per partition
+ self.query_partitioned_offsets(py, subscribed)
+ }
+ }
+
+ /// Query offsets for partitioned table subscriptions
+ fn query_partitioned_offsets(
+ &self,
+ py: Python,
+ subscribed: &[(fcore::metadata::TableBucket, i64)],
+ ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
+ let table_path = &self.table_info.table_path;
+
+ // Get partition_id -> partition_name mapping (cached)
+ let partition_id_to_name = self.get_partition_name_map(py,
table_path)?;
+
+ // Group subscribed buckets by partition_id
+ let mut by_partition: HashMap<i64, Vec<i32>> = HashMap::new();
+ for (tb, _) in subscribed {
+ if let Some(partition_id) = tb.partition_id() {
+ by_partition
+ .entry(partition_id)
+ .or_default()
+ .push(tb.bucket_id());
+ }
+ }
+
+ // Query offsets for each partition
+ let mut result: HashMap<fcore::metadata::TableBucket, i64> =
HashMap::new();
+ let table_id = self.table_info.table_id;
+
+ for (partition_id, bucket_ids) in by_partition {
+ let partition_name =
partition_id_to_name.get(&partition_id).ok_or_else(|| {
+ FlussError::new_err(format!("Unknown partition_id:
{partition_id}"))
+ })?;
+
+ let offsets: HashMap<i32, i64> = py
+ .detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ self.admin
+ .list_partition_offsets(
+ table_path,
+ partition_name,
+ &bucket_ids,
+ OffsetSpec::Latest,
+ )
+ .await
+ })
+ })
+ .map_err(|e| {
+ FlussError::new_err(format!(
+ "Failed to list offsets for partition
{partition_name}: {e}"
+ ))
+ })?;
+
+ for (bucket_id, offset) in offsets {
+ if offset > 0 {
+ let tb = fcore::metadata::TableBucket::new_with_partition(
+ table_id,
+ Some(partition_id),
+ bucket_id,
+ );
+ result.insert(tb, offset);
+ }
+ }
}
+
+ Ok(result)
+ }
+
+ /// Poll until all buckets reach their stopping offsets
+ fn poll_until_offsets(
+ &self,
+ py: Python,
+ mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>,
+ ) -> PyResult<Py<PyAny>> {
+ let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+ FlussError::new_err("Batch-based scanner required for this
operation")
+ })?;
+ let mut all_batches = Vec::new();
+
+ while !stopping_offsets.is_empty() {
+ let scan_batches = py
+ .detach(|| {
+ TOKIO_RUNTIME
+ .block_on(async {
inner_batch.poll(Duration::from_millis(500)).await })
+ })
+ .map_err(|e| FlussError::new_err(format!("Failed to poll:
{e}")))?;
+
+ if scan_batches.is_empty() {
+ continue;
+ }
+
+ for scan_batch in scan_batches {
+ let table_bucket = scan_batch.bucket().clone();
+
+ // Check if this bucket is still being tracked
+ let Some(&stop_at) = stopping_offsets.get(&table_bucket) else {
+ continue;
+ };
+
+ let base_offset = scan_batch.base_offset();
+ let last_offset = scan_batch.last_offset();
+
+ // If the batch starts at or after the stop_at offset, the
bucket is exhausted
+ if base_offset >= stop_at {
+ stopping_offsets.remove(&table_bucket);
+ continue;
+ }
+
+ let batch = if last_offset >= stop_at {
+ // Slice batch to keep only records where offset < stop_at
+ let num_to_keep = (stop_at - base_offset) as usize;
+ let b = scan_batch.into_batch();
+ let limit = num_to_keep.min(b.num_rows());
+ b.slice(0, limit)
+ } else {
+ scan_batch.into_batch()
+ };
+
+ all_batches.push(Arc::new(batch));
+
+ // Check if we're done with this bucket
+ if last_offset >= stop_at - 1 {
+ stopping_offsets.remove(&table_bucket);
+ }
+ }
+ }
+
+ Utils::combine_batches_to_table(py, all_batches)
}
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index ef68fb4..d50f19e 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -514,6 +514,16 @@ impl RecordBatchLogScanner {
.subscribe_partition(partition_id, bucket, offset)
.await
}
+
+ /// Returns whether the table is partitioned
+ pub fn is_partitioned(&self) -> bool {
+ self.inner.is_partitioned_table
+ }
+
+ /// Returns all subscribed buckets with their current offsets
+ pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
+ self.inner.log_scanner_status.get_all_subscriptions()
+ }
}
struct LogFetcher {
@@ -1512,6 +1522,16 @@ impl LogScannerStatus {
result
}
+ /// Returns all subscribed buckets with their current offsets
+ pub fn get_all_subscriptions(&self) -> Vec<(TableBucket, i64)> {
+ let map = self.bucket_status_map.read();
+ let mut result = Vec::new();
+ map.for_each(|bucket, status| {
+ result.push((bucket.clone(), status.offset()));
+ });
+ result
+ }
+
/// Helper to get bucket status
fn get_status(&self, table_bucket: &TableBucket) ->
Option<Arc<BucketScanStatus>> {
let map = self.bucket_status_map.read();