This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 82f8bba  chore: python binding cleanup and bit of refactor (#264)
82f8bba is described below

commit 82f8bba2b6f1df81065ec9dce42cb16cfb54a708
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 7 02:01:49 2026 +0000

    chore: python binding cleanup and bit of refactor (#264)
---
 bindings/python/src/table.rs | 207 ++++++++++++++++---------------------------
 1 file changed, 78 insertions(+), 129 deletions(-)

diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index c285f25..7184c8d 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -265,34 +265,29 @@ impl TableScan {
             let (projected_schema, projected_row_type) =
                 calculate_projected_types(&table_info, projection_indices)?;
 
-            let py_scanner = match scanner_type {
+            let scanner_kind = match scanner_type {
                 ScannerType::Record => {
-                    let rust_scanner = 
table_scan.create_log_scanner().map_err(|e| {
+                    let s = table_scan.create_log_scanner().map_err(|e| {
                         FlussError::new_err(format!("Failed to create log 
scanner: {e}"))
                     })?;
-                    LogScanner::from_log_scanner(
-                        rust_scanner,
-                        admin,
-                        table_info,
-                        projected_schema,
-                        projected_row_type,
-                    )
+                    ScannerKind::Record(s)
                 }
                 ScannerType::Batch => {
-                    let rust_scanner =
-                        
table_scan.create_record_batch_log_scanner().map_err(|e| {
-                            FlussError::new_err(format!("Failed to create 
batch scanner: {e}"))
-                        })?;
-                    LogScanner::from_batch_scanner(
-                        rust_scanner,
-                        admin,
-                        table_info,
-                        projected_schema,
-                        projected_row_type,
-                    )
+                    let s = 
table_scan.create_record_batch_log_scanner().map_err(|e| {
+                        FlussError::new_err(format!("Failed to create batch 
scanner: {e}"))
+                    })?;
+                    ScannerKind::Batch(s)
                 }
             };
 
+            let py_scanner = LogScanner::new(
+                scanner_kind,
+                admin,
+                table_info,
+                projected_schema,
+                projected_row_type,
+            );
+
             Python::attach(|py| Py::new(py, py_scanner))
         })
     }
@@ -1555,6 +1550,44 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
         .unwrap_or_else(|_| "unknown".to_string())
 }
 
+/// Wraps the two scanner variants so we never have an impossible state
+/// (both None or both Some).
+enum ScannerKind {
+    Record(fcore::client::LogScanner),
+    Batch(fcore::client::RecordBatchLogScanner),
+}
+
+impl ScannerKind {
+    fn as_record(&self) -> PyResult<&fcore::client::LogScanner> {
+        match self {
+            Self::Record(s) => Ok(s),
+            Self::Batch(_) => Err(FlussError::new_err(
+                "poll() requires a record-based scanner. Use 
new_scan().create_log_scanner().",
+            )),
+        }
+    }
+
+    fn as_batch(&self) -> PyResult<&fcore::client::RecordBatchLogScanner> {
+        match self {
+            Self::Batch(s) => Ok(s),
+            Self::Record(_) => Err(FlussError::new_err(
+                "This method requires a batch-based scanner. Use 
new_scan().create_batch_scanner().",
+            )),
+        }
+    }
+}
+
+/// Dispatch a method call to whichever scanner variant is active.
+/// Both `LogScanner` and `RecordBatchLogScanner` share the same subscribe 
interface.
+macro_rules! with_scanner {
+    ($scanner:expr, $method:ident($($arg:expr),*)) => {
+        match $scanner {
+            ScannerKind::Record(s) => s.$method($($arg),*).await,
+            ScannerKind::Batch(s) => s.$method($($arg),*).await,
+        }
+    };
+}
+
 /// Scanner for reading log data from a Fluss table.
 ///
 /// This scanner supports two modes:
@@ -1562,10 +1595,7 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
 /// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns 
Arrow batches
 #[pyclass]
 pub struct LogScanner {
-    /// Record-based scanner for poll()
-    inner: Option<fcore::client::LogScanner>,
-    /// Batch-based scanner for poll_arrow/poll_batches
-    inner_batch: Option<fcore::client::RecordBatchLogScanner>,
+    scanner: ScannerKind,
     admin: fcore::client::FlussAdmin,
     table_info: fcore::metadata::TableInfo,
     /// The projected Arrow schema to use for empty table creation
@@ -1586,19 +1616,8 @@ impl LogScanner {
     fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) -> 
PyResult<()> {
         py.detach(|| {
             TOKIO_RUNTIME.block_on(async {
-                if let Some(ref inner) = self.inner {
-                    inner
-                        .subscribe(bucket_id, start_offset)
-                        .await
-                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe: {e}")))
-                } else if let Some(ref inner_batch) = self.inner_batch {
-                    inner_batch
-                        .subscribe(bucket_id, start_offset)
-                        .await
-                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe: {e}")))
-                } else {
-                    Err(FlussError::new_err("No scanner available"))
-                }
+                with_scanner!(&self.scanner, subscribe(bucket_id, 
start_offset))
+                    .map_err(|e| FlussError::new_err(e.to_string()))
             })
         })
     }
@@ -1610,19 +1629,8 @@ impl LogScanner {
     fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap<i32, i64>) 
-> PyResult<()> {
         py.detach(|| {
             TOKIO_RUNTIME.block_on(async {
-                if let Some(ref inner) = self.inner {
-                    inner
-                        .subscribe_buckets(&bucket_offsets)
-                        .await
-                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe batch: {e}")))
-                } else if let Some(ref inner_batch) = self.inner_batch {
-                    inner_batch
-                        .subscribe_buckets(&bucket_offsets)
-                        .await
-                        .map_err(|e| FlussError::new_err(format!("Failed to 
subscribe batch: {e}")))
-                } else {
-                    Err(FlussError::new_err("No scanner available"))
-                }
+                with_scanner!(&self.scanner, 
subscribe_buckets(&bucket_offsets))
+                    .map_err(|e| FlussError::new_err(e.to_string()))
             })
         })
     }
@@ -1642,23 +1650,11 @@ impl LogScanner {
     ) -> PyResult<()> {
         py.detach(|| {
             TOKIO_RUNTIME.block_on(async {
-                if let Some(ref inner) = self.inner {
-                    inner
-                        .subscribe_partition(partition_id, bucket_id, 
start_offset)
-                        .await
-                        .map_err(|e| {
-                            FlussError::new_err(format!("Failed to subscribe 
partition: {e}"))
-                        })
-                } else if let Some(ref inner_batch) = self.inner_batch {
-                    inner_batch
-                        .subscribe_partition(partition_id, bucket_id, 
start_offset)
-                        .await
-                        .map_err(|e| {
-                            FlussError::new_err(format!("Failed to subscribe 
partition: {e}"))
-                        })
-                } else {
-                    Err(FlussError::new_err("No scanner available"))
-                }
+                with_scanner!(
+                    &self.scanner,
+                    subscribe_partition(partition_id, bucket_id, start_offset)
+                )
+                .map_err(|e| FlussError::new_err(e.to_string()))
             })
         })
     }
@@ -1677,12 +1673,7 @@ impl LogScanner {
     ///     - Returns an empty list if no records are available
     ///     - When timeout expires, returns an empty list (NOT an error)
     fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
-        let inner = self.inner.as_ref().ok_or_else(|| {
-            FlussError::new_err(
-                "Record-based scanner not available. Use 
new_scan().create_log_scanner() to create a scanner \
-                 that supports poll().",
-            )
-        })?;
+        let scanner = self.scanner.as_record()?;
 
         if timeout_ms < 0 {
             return Err(FlussError::new_err(format!(
@@ -1692,7 +1683,7 @@ impl LogScanner {
 
         let timeout = Duration::from_millis(timeout_ms as u64);
         let scan_records = py
-            .detach(|| TOKIO_RUNTIME.block_on(async { 
inner.poll(timeout).await }))
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
scanner.poll(timeout).await }))
             .map_err(|e| FlussError::new_err(e.to_string()))?;
 
         // Convert ScanRecords to Python ScanRecord list
@@ -1724,12 +1715,7 @@ impl LogScanner {
     ///     - Returns an empty list if no batches are available
     ///     - When timeout expires, returns an empty list (NOT an error)
     fn poll_batches(&self, py: Python, timeout_ms: i64) -> 
PyResult<Vec<RecordBatch>> {
-        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
-            FlussError::new_err(
-                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
-                 that supports poll_batches().",
-            )
-        })?;
+        let scanner = self.scanner.as_batch()?;
 
         if timeout_ms < 0 {
             return Err(FlussError::new_err(format!(
@@ -1739,7 +1725,7 @@ impl LogScanner {
 
         let timeout = Duration::from_millis(timeout_ms as u64);
         let scan_batches = py
-            .detach(|| TOKIO_RUNTIME.block_on(async { 
inner_batch.poll(timeout).await }))
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
scanner.poll(timeout).await }))
             .map_err(|e| FlussError::new_err(e.to_string()))?;
 
         // Convert ScanBatch to RecordBatch with metadata
@@ -1764,12 +1750,7 @@ impl LogScanner {
     ///     - Returns an empty table (with correct schema) if no records are 
available
     ///     - When timeout expires, returns an empty table (NOT an error)
     fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
-        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
-            FlussError::new_err(
-                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
-                 that supports poll_arrow().",
-            )
-        })?;
+        let scanner = self.scanner.as_batch()?;
 
         if timeout_ms < 0 {
             return Err(FlussError::new_err(format!(
@@ -1779,7 +1760,7 @@ impl LogScanner {
 
         let timeout = Duration::from_millis(timeout_ms as u64);
         let scan_batches = py
-            .detach(|| TOKIO_RUNTIME.block_on(async { 
inner_batch.poll(timeout).await }))
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
scanner.poll(timeout).await }))
             .map_err(|e| FlussError::new_err(e.to_string()))?;
 
         // Convert ScanBatch to Arrow batches
@@ -1822,14 +1803,8 @@ impl LogScanner {
     /// Returns:
     ///     PyArrow Table containing all data from subscribed buckets
     fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
-        // 1. Get subscribed buckets from scanner (requires batch scanner for 
get_subscribed_buckets)
-        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
-            FlussError::new_err(
-                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
-                 that supports to_arrow().",
-            )
-        })?;
-        let subscribed = inner_batch.get_subscribed_buckets();
+        let scanner = self.scanner.as_batch()?;
+        let subscribed = scanner.get_subscribed_buckets();
         if subscribed.is_empty() {
             return Err(FlussError::new_err(
                 "No buckets subscribed. Call subscribe(), subscribe_buckets(), 
or subscribe_partition() first.",
@@ -1866,36 +1841,15 @@ impl LogScanner {
 }
 
 impl LogScanner {
-    /// Create LogScanner for record-based scanning
-    pub fn from_log_scanner(
-        inner_scanner: fcore::client::LogScanner,
-        admin: fcore::client::FlussAdmin,
-        table_info: fcore::metadata::TableInfo,
-        projected_schema: SchemaRef,
-        projected_row_type: fcore::metadata::RowType,
-    ) -> Self {
-        Self {
-            inner: Some(inner_scanner),
-            inner_batch: None,
-            admin,
-            table_info,
-            projected_schema,
-            projected_row_type,
-            partition_name_cache: std::sync::RwLock::new(None),
-        }
-    }
-
-    /// Create LogScanner for batch-based scanning
-    pub fn from_batch_scanner(
-        inner_batch_scanner: fcore::client::RecordBatchLogScanner,
+    fn new(
+        scanner: ScannerKind,
         admin: fcore::client::FlussAdmin,
         table_info: fcore::metadata::TableInfo,
         projected_schema: SchemaRef,
         projected_row_type: fcore::metadata::RowType,
     ) -> Self {
         Self {
-            inner: None,
-            inner_batch: Some(inner_batch_scanner),
+            scanner,
             admin,
             table_info,
             projected_schema,
@@ -1946,10 +1900,8 @@ impl LogScanner {
         py: Python,
         subscribed: &[(fcore::metadata::TableBucket, i64)],
     ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
-        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
-            FlussError::new_err("Batch-based scanner required for this 
operation")
-        })?;
-        let is_partitioned = inner_batch.is_partitioned();
+        let scanner = self.scanner.as_batch()?;
+        let is_partitioned = scanner.is_partitioned();
         let table_path = &self.table_info.table_path;
 
         if !is_partitioned {
@@ -2055,16 +2007,13 @@ impl LogScanner {
         py: Python,
         mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>,
     ) -> PyResult<Py<PyAny>> {
-        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
-            FlussError::new_err("Batch-based scanner required for this 
operation")
-        })?;
+        let scanner = self.scanner.as_batch()?;
         let mut all_batches = Vec::new();
 
         while !stopping_offsets.is_empty() {
             let scan_batches = py
                 .detach(|| {
-                    TOKIO_RUNTIME
-                        .block_on(async { 
inner_batch.poll(Duration::from_millis(500)).await })
+                    TOKIO_RUNTIME.block_on(async { 
scanner.poll(Duration::from_millis(500)).await })
                 })
                 .map_err(|e| FlussError::new_err(format!("Failed to poll: 
{e}")))?;
 

Reply via email to