This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cc7bf6  chore: Add blocking poll into python bindings (#154)
7cc7bf6 is described below

commit 7cc7bf6610b511a90c6ecd222f6dcf721d76f845
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Jan 27 02:37:13 2026 +0100

    chore: Add blocking poll into python bindings (#154)
---
 bindings/python/example/example.py |  22 +++++
 bindings/python/src/table.rs       | 175 ++++++++++++++++++++++++++-----------
 2 files changed, 145 insertions(+), 52 deletions(-)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 730416b..5d0302e 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -224,6 +224,28 @@ async def main():
 
         # TODO: support to_duckdb()
 
+        # Test the new poll() method for incremental reading
+        print("\n--- Testing poll() method ---")
+        # Reset subscription to start from the beginning
+        log_scanner.subscribe(None, None)
+
+        # Poll with a timeout of 5000ms (5 seconds)
+        # Note: poll() returns an empty table (not an error) on timeout
+        try:
+            poll_result = log_scanner.poll(5000)
+            print(f"Number of rows: {poll_result.num_rows}")
+
+            if poll_result.num_rows > 0:
+                poll_df = poll_result.to_pandas()
+                print(f"Polled data:\n{poll_df}")
+            else:
+                print("Empty result (no records available)")
+                # Empty table still has schema
+                print(f"Schema: {poll_result.schema}")
+
+        except Exception as e:
+            print(f"Error during poll: {e}")
+
     except Exception as e:
         print(f"Error during scanning: {e}")
 
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 0ae7186..4043350 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -20,10 +20,13 @@ use crate::*;
 use arrow::array::RecordBatch;
 use arrow_pyarrow::{FromPyArrow, ToPyArrow};
 use fluss::client::EARLIEST_OFFSET;
+use fluss::record::to_arrow_schema;
 use fluss::rpc::message::OffsetSpec;
 use pyo3::types::IntoPyDict;
 use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
 use std::sync::Arc;
+use std::time::Duration;
 
 // Time conversion constants
 const MILLIS_PER_SECOND: i64 = 1_000;
@@ -186,7 +189,7 @@ impl FlussTable {
             }
 
             let rust_scanner = table_scan
-                .create_log_scanner()
+                .create_record_batch_log_scanner()
                 .map_err(|e| FlussError::new_err(format!("Failed to create log 
scanner: {e}")))?;
 
             let admin = conn
@@ -886,7 +889,7 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
 /// Scanner for reading log data from a Fluss table
 #[pyclass]
 pub struct LogScanner {
-    inner: fcore::client::LogScanner,
+    inner: fcore::client::RecordBatchLogScanner,
     admin: fcore::client::FlussAdmin,
     table_info: fcore::metadata::TableInfo,
     #[allow(dead_code)]
@@ -931,63 +934,78 @@ impl LogScanner {
 
     /// Convert all data to Arrow Table
     fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
-        use std::collections::HashMap;
-        use std::time::Duration;
-
         let mut all_batches = Vec::new();
 
         let num_buckets = self.table_info.get_num_buckets();
         let bucket_ids: Vec<i32> = (0..num_buckets).collect();
 
         // todo: after supporting list_offsets with timestamp, we can use 
start_timestamp and end_timestamp here
-        let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
-            .block_on(async {
-                self.admin
-                    .list_offsets(
-                        &self.table_info.table_path,
-                        bucket_ids.as_slice(),
-                        OffsetSpec::Latest,
-                    )
-                    .await
+        let mut stopping_offsets: HashMap<i32, i64> = py
+            .detach(|| {
+                TOKIO_RUNTIME.block_on(async {
+                    self.admin
+                        .list_offsets(
+                            &self.table_info.table_path,
+                            bucket_ids.as_slice(),
+                            OffsetSpec::Latest,
+                        )
+                        .await
+                })
             })
             .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-        if !stopping_offsets.is_empty() {
-            loop {
-                let batch_result = TOKIO_RUNTIME
-                    .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await });
-
-                match batch_result {
-                    Ok(scan_records) => {
-                        let mut result_records: Vec<fcore::record::ScanRecord> 
= vec![];
-                        for (bucket, records) in 
scan_records.into_records_by_buckets() {
-                            let stopping_offset = 
stopping_offsets.get(&bucket.bucket_id());
-
-                            if stopping_offset.is_none() {
-                                // not to include this bucket, skip records 
for this bucket
-                                // since we already reach end offset for this 
bucket
-                                continue;
-                            }
-                            if let Some(last_record) = records.last() {
-                                let offset = last_record.offset();
-                                result_records.extend(records);
-                                if offset >= stopping_offset.unwrap() - 1 {
-                                    
stopping_offsets.remove(&bucket.bucket_id());
-                                }
-                            }
-                        }
-
-                        if !result_records.is_empty() {
-                            let arrow_batch = 
Utils::convert_scan_records_to_arrow(result_records);
-                            all_batches.extend(arrow_batch);
-                        }
-
-                        // we have reach end offsets of all bucket
-                        if stopping_offsets.is_empty() {
-                            break;
-                        }
-                    }
-                    Err(e) => return Err(FlussError::new_err(e.to_string())),
+        // Filter out buckets with no records to read (stop_at <= 0)
+        stopping_offsets.retain(|_, &mut v| v > 0);
+
+        while !stopping_offsets.is_empty() {
+            let scan_batches = py
+                .detach(|| {
+                    TOKIO_RUNTIME
+                        .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await })
+                })
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            if scan_batches.is_empty() {
+                continue;
+            }
+
+            for scan_batch in scan_batches {
+                let bucket_id = scan_batch.bucket().bucket_id();
+
+                // Check if this bucket is still being tracked; if not, ignore 
the batch
+                let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
+                    continue;
+                };
+
+                let base_offset = scan_batch.base_offset();
+                let last_offset = scan_batch.last_offset();
+
+                // If the batch starts at or after the stop_at offset, the 
bucket is exhausted
+                if base_offset >= stop_at {
+                    stopping_offsets.remove(&bucket_id);
+                    continue;
+                }
+
+                let batch = if last_offset >= stop_at {
+                    // This batch contains the target offset; slice it to keep 
only records
+                    // where offset < stop_at.
+                    let num_to_keep = (stop_at - base_offset) as usize;
+                    let b = scan_batch.into_batch();
+
+                    // Safety check: ensure we don't attempt to slice more 
rows than the batch contains
+                    let limit = num_to_keep.min(b.num_rows());
+                    b.slice(0, limit)
+                } else {
+                    // The entire batch is within the desired range (all 
offsets < stop_at)
+                    scan_batch.into_batch()
+                };
+
+                all_batches.push(Arc::new(batch));
+
+                // If the batch's last offset reached or passed the inclusive 
limit (stop_at - 1),
+                // we are done with this bucket.
+                if last_offset >= stop_at - 1 {
+                    stopping_offsets.remove(&bucket_id);
                 }
             }
         }
@@ -1004,15 +1022,68 @@ impl LogScanner {
         Ok(df)
     }
 
+    /// Poll for new records with the specified timeout
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds to wait for records
+    ///
+    /// Returns:
+    ///     PyArrow Table containing the polled records
+    ///
+    /// Note:
+    ///     - Returns an empty table (with correct schema) if no records are 
available
+    ///     - When timeout expires, returns an empty table (NOT an error)
+    fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
+        if timeout_ms < 0 {
+            return Err(FlussError::new_err(format!(
+                "timeout_ms must be non-negative, got: {timeout_ms}"
+            )));
+        }
+
+        let timeout = Duration::from_millis(timeout_ms as u64);
+        let scan_batches = py
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
self.inner.poll(timeout).await }))
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        // Convert ScanBatch to Arrow batches
+        if scan_batches.is_empty() {
+            return self.create_empty_table(py);
+        }
+
+        let arrow_batches: Vec<_> = scan_batches
+            .into_iter()
+            .map(|scan_batch| Arc::new(scan_batch.into_batch()))
+            .collect();
+
+        Utils::combine_batches_to_table(py, arrow_batches)
+    }
+
+    /// Create an empty PyArrow table with the correct schema
+    fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> {
+        let arrow_schema = to_arrow_schema(self.table_info.get_row_type())
+            .map_err(|e| FlussError::new_err(format!("Failed to get arrow 
schema: {e}")))?;
+        let py_schema = arrow_schema
+            .as_ref()
+            .to_pyarrow(py)
+            .map_err(|e| FlussError::new_err(format!("Failed to convert 
schema: {e}")))?;
+
+        let pyarrow = py.import("pyarrow")?;
+        let empty_table = pyarrow
+            .getattr("Table")?
+            .call_method1("from_batches", (vec![] as Vec<Py<PyAny>>, 
py_schema))?;
+
+        Ok(empty_table.into())
+    }
+
     fn __repr__(&self) -> String {
         format!("LogScanner(table={})", self.table_info.table_path)
     }
 }
 
 impl LogScanner {
-    /// Create LogScanner from core LogScanner
+    /// Create LogScanner from core RecordBatchLogScanner
     pub fn from_core(
-        inner_scanner: fcore::client::LogScanner,
+        inner_scanner: fcore::client::RecordBatchLogScanner,
         admin: fcore::client::FlussAdmin,
         table_info: fcore::metadata::TableInfo,
     ) -> Self {

Reply via email to