This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 82f8bba chore: python binding cleanup and bit of refactor (#264)
82f8bba is described below
commit 82f8bba2b6f1df81065ec9dce42cb16cfb54a708
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 7 02:01:49 2026 +0000
chore: python binding cleanup and bit of refactor (#264)
---
bindings/python/src/table.rs | 207 ++++++++++++++++---------------------------
1 file changed, 78 insertions(+), 129 deletions(-)
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index c285f25..7184c8d 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -265,34 +265,29 @@ impl TableScan {
let (projected_schema, projected_row_type) =
calculate_projected_types(&table_info, projection_indices)?;
- let py_scanner = match scanner_type {
+ let scanner_kind = match scanner_type {
ScannerType::Record => {
- let rust_scanner =
table_scan.create_log_scanner().map_err(|e| {
+ let s = table_scan.create_log_scanner().map_err(|e| {
FlussError::new_err(format!("Failed to create log
scanner: {e}"))
})?;
- LogScanner::from_log_scanner(
- rust_scanner,
- admin,
- table_info,
- projected_schema,
- projected_row_type,
- )
+ ScannerKind::Record(s)
}
ScannerType::Batch => {
- let rust_scanner =
-
table_scan.create_record_batch_log_scanner().map_err(|e| {
- FlussError::new_err(format!("Failed to create
batch scanner: {e}"))
- })?;
- LogScanner::from_batch_scanner(
- rust_scanner,
- admin,
- table_info,
- projected_schema,
- projected_row_type,
- )
+ let s =
table_scan.create_record_batch_log_scanner().map_err(|e| {
+ FlussError::new_err(format!("Failed to create batch
scanner: {e}"))
+ })?;
+ ScannerKind::Batch(s)
}
};
+ let py_scanner = LogScanner::new(
+ scanner_kind,
+ admin,
+ table_info,
+ projected_schema,
+ projected_row_type,
+ );
+
Python::attach(|py| Py::new(py, py_scanner))
})
}
@@ -1555,6 +1550,44 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
.unwrap_or_else(|_| "unknown".to_string())
}
+/// Wraps the two scanner variants so we never have an impossible state
+/// (both None or both Some).
+enum ScannerKind {
+ Record(fcore::client::LogScanner),
+ Batch(fcore::client::RecordBatchLogScanner),
+}
+
+impl ScannerKind {
+ fn as_record(&self) -> PyResult<&fcore::client::LogScanner> {
+ match self {
+ Self::Record(s) => Ok(s),
+ Self::Batch(_) => Err(FlussError::new_err(
+ "poll() requires a record-based scanner. Use
new_scan().create_log_scanner().",
+ )),
+ }
+ }
+
+ fn as_batch(&self) -> PyResult<&fcore::client::RecordBatchLogScanner> {
+ match self {
+ Self::Batch(s) => Ok(s),
+ Self::Record(_) => Err(FlussError::new_err(
+ "This method requires a batch-based scanner. Use
new_scan().create_batch_scanner().",
+ )),
+ }
+ }
+}
+
+/// Dispatch a method call to whichever scanner variant is active.
+/// Both `LogScanner` and `RecordBatchLogScanner` share the same subscribe
interface.
+macro_rules! with_scanner {
+ ($scanner:expr, $method:ident($($arg:expr),*)) => {
+ match $scanner {
+ ScannerKind::Record(s) => s.$method($($arg),*).await,
+ ScannerKind::Batch(s) => s.$method($($arg),*).await,
+ }
+ };
+}
+
/// Scanner for reading log data from a Fluss table.
///
/// This scanner supports two modes:
@@ -1562,10 +1595,7 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
/// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns
Arrow batches
#[pyclass]
pub struct LogScanner {
- /// Record-based scanner for poll()
- inner: Option<fcore::client::LogScanner>,
- /// Batch-based scanner for poll_arrow/poll_batches
- inner_batch: Option<fcore::client::RecordBatchLogScanner>,
+ scanner: ScannerKind,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
/// The projected Arrow schema to use for empty table creation
@@ -1586,19 +1616,8 @@ impl LogScanner {
fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) ->
PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
- if let Some(ref inner) = self.inner {
- inner
- .subscribe(bucket_id, start_offset)
- .await
- .map_err(|e| FlussError::new_err(format!("Failed to
subscribe: {e}")))
- } else if let Some(ref inner_batch) = self.inner_batch {
- inner_batch
- .subscribe(bucket_id, start_offset)
- .await
- .map_err(|e| FlussError::new_err(format!("Failed to
subscribe: {e}")))
- } else {
- Err(FlussError::new_err("No scanner available"))
- }
+ with_scanner!(&self.scanner, subscribe(bucket_id,
start_offset))
+ .map_err(|e| FlussError::new_err(e.to_string()))
})
})
}
@@ -1610,19 +1629,8 @@ impl LogScanner {
fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap<i32, i64>)
-> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
- if let Some(ref inner) = self.inner {
- inner
- .subscribe_buckets(&bucket_offsets)
- .await
- .map_err(|e| FlussError::new_err(format!("Failed to
subscribe batch: {e}")))
- } else if let Some(ref inner_batch) = self.inner_batch {
- inner_batch
- .subscribe_buckets(&bucket_offsets)
- .await
- .map_err(|e| FlussError::new_err(format!("Failed to
subscribe batch: {e}")))
- } else {
- Err(FlussError::new_err("No scanner available"))
- }
+ with_scanner!(&self.scanner,
subscribe_buckets(&bucket_offsets))
+ .map_err(|e| FlussError::new_err(e.to_string()))
})
})
}
@@ -1642,23 +1650,11 @@ impl LogScanner {
) -> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
- if let Some(ref inner) = self.inner {
- inner
- .subscribe_partition(partition_id, bucket_id,
start_offset)
- .await
- .map_err(|e| {
- FlussError::new_err(format!("Failed to subscribe
partition: {e}"))
- })
- } else if let Some(ref inner_batch) = self.inner_batch {
- inner_batch
- .subscribe_partition(partition_id, bucket_id,
start_offset)
- .await
- .map_err(|e| {
- FlussError::new_err(format!("Failed to subscribe
partition: {e}"))
- })
- } else {
- Err(FlussError::new_err("No scanner available"))
- }
+ with_scanner!(
+ &self.scanner,
+ subscribe_partition(partition_id, bucket_id, start_offset)
+ )
+ .map_err(|e| FlussError::new_err(e.to_string()))
})
})
}
@@ -1677,12 +1673,7 @@ impl LogScanner {
/// - Returns an empty list if no records are available
/// - When timeout expires, returns an empty list (NOT an error)
fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
- let inner = self.inner.as_ref().ok_or_else(|| {
- FlussError::new_err(
- "Record-based scanner not available. Use
new_scan().create_log_scanner() to create a scanner \
- that supports poll().",
- )
- })?;
+ let scanner = self.scanner.as_record()?;
if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
@@ -1692,7 +1683,7 @@ impl LogScanner {
let timeout = Duration::from_millis(timeout_ms as u64);
let scan_records = py
- .detach(|| TOKIO_RUNTIME.block_on(async {
inner.poll(timeout).await }))
+ .detach(|| TOKIO_RUNTIME.block_on(async {
scanner.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;
// Convert ScanRecords to Python ScanRecord list
@@ -1724,12 +1715,7 @@ impl LogScanner {
/// - Returns an empty list if no batches are available
/// - When timeout expires, returns an empty list (NOT an error)
fn poll_batches(&self, py: Python, timeout_ms: i64) ->
PyResult<Vec<RecordBatch>> {
- let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
- FlussError::new_err(
- "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
- that supports poll_batches().",
- )
- })?;
+ let scanner = self.scanner.as_batch()?;
if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
@@ -1739,7 +1725,7 @@ impl LogScanner {
let timeout = Duration::from_millis(timeout_ms as u64);
let scan_batches = py
- .detach(|| TOKIO_RUNTIME.block_on(async {
inner_batch.poll(timeout).await }))
+ .detach(|| TOKIO_RUNTIME.block_on(async {
scanner.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;
// Convert ScanBatch to RecordBatch with metadata
@@ -1764,12 +1750,7 @@ impl LogScanner {
/// - Returns an empty table (with correct schema) if no records are
available
/// - When timeout expires, returns an empty table (NOT an error)
fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
- let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
- FlussError::new_err(
- "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
- that supports poll_arrow().",
- )
- })?;
+ let scanner = self.scanner.as_batch()?;
if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
@@ -1779,7 +1760,7 @@ impl LogScanner {
let timeout = Duration::from_millis(timeout_ms as u64);
let scan_batches = py
- .detach(|| TOKIO_RUNTIME.block_on(async {
inner_batch.poll(timeout).await }))
+ .detach(|| TOKIO_RUNTIME.block_on(async {
scanner.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;
// Convert ScanBatch to Arrow batches
@@ -1822,14 +1803,8 @@ impl LogScanner {
/// Returns:
/// PyArrow Table containing all data from subscribed buckets
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
- // 1. Get subscribed buckets from scanner (requires batch scanner for
get_subscribed_buckets)
- let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
- FlussError::new_err(
- "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
- that supports to_arrow().",
- )
- })?;
- let subscribed = inner_batch.get_subscribed_buckets();
+ let scanner = self.scanner.as_batch()?;
+ let subscribed = scanner.get_subscribed_buckets();
if subscribed.is_empty() {
return Err(FlussError::new_err(
"No buckets subscribed. Call subscribe(), subscribe_buckets(),
or subscribe_partition() first.",
@@ -1866,36 +1841,15 @@ impl LogScanner {
}
impl LogScanner {
- /// Create LogScanner for record-based scanning
- pub fn from_log_scanner(
- inner_scanner: fcore::client::LogScanner,
- admin: fcore::client::FlussAdmin,
- table_info: fcore::metadata::TableInfo,
- projected_schema: SchemaRef,
- projected_row_type: fcore::metadata::RowType,
- ) -> Self {
- Self {
- inner: Some(inner_scanner),
- inner_batch: None,
- admin,
- table_info,
- projected_schema,
- projected_row_type,
- partition_name_cache: std::sync::RwLock::new(None),
- }
- }
-
- /// Create LogScanner for batch-based scanning
- pub fn from_batch_scanner(
- inner_batch_scanner: fcore::client::RecordBatchLogScanner,
+ fn new(
+ scanner: ScannerKind,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
projected_schema: SchemaRef,
projected_row_type: fcore::metadata::RowType,
) -> Self {
Self {
- inner: None,
- inner_batch: Some(inner_batch_scanner),
+ scanner,
admin,
table_info,
projected_schema,
@@ -1946,10 +1900,8 @@ impl LogScanner {
py: Python,
subscribed: &[(fcore::metadata::TableBucket, i64)],
) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
- let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
- FlussError::new_err("Batch-based scanner required for this
operation")
- })?;
- let is_partitioned = inner_batch.is_partitioned();
+ let scanner = self.scanner.as_batch()?;
+ let is_partitioned = scanner.is_partitioned();
let table_path = &self.table_info.table_path;
if !is_partitioned {
@@ -2055,16 +2007,13 @@ impl LogScanner {
py: Python,
mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>,
) -> PyResult<Py<PyAny>> {
- let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
- FlussError::new_err("Batch-based scanner required for this
operation")
- })?;
+ let scanner = self.scanner.as_batch()?;
let mut all_batches = Vec::new();
while !stopping_offsets.is_empty() {
let scan_batches = py
.detach(|| {
- TOKIO_RUNTIME
- .block_on(async {
inner_batch.poll(Duration::from_millis(500)).await })
+ TOKIO_RUNTIME.block_on(async {
scanner.poll(Duration::from_millis(500)).await })
})
.map_err(|e| FlussError::new_err(format!("Failed to poll:
{e}")))?;