This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7cc7bf6 chore: Add blocking poll into python bindings (#154)
7cc7bf6 is described below
commit 7cc7bf6610b511a90c6ecd222f6dcf721d76f845
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Jan 27 02:37:13 2026 +0100
chore: Add blocking poll into python bindings (#154)
---
bindings/python/example/example.py | 22 +++++
bindings/python/src/table.rs | 175 ++++++++++++++++++++++++++-----------
2 files changed, 145 insertions(+), 52 deletions(-)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 730416b..5d0302e 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -224,6 +224,28 @@ async def main():
# TODO: support to_duckdb()
+ # Test the new poll() method for incremental reading
+ print("\n--- Testing poll() method ---")
+ # Reset subscription to start from the beginning
+ log_scanner.subscribe(None, None)
+
+ # Poll with a timeout of 5000ms (5 seconds)
+ # Note: poll() returns an empty table (not an error) on timeout
+ try:
+ poll_result = log_scanner.poll(5000)
+ print(f"Number of rows: {poll_result.num_rows}")
+
+ if poll_result.num_rows > 0:
+ poll_df = poll_result.to_pandas()
+ print(f"Polled data:\n{poll_df}")
+ else:
+ print("Empty result (no records available)")
+ # Empty table still has schema
+ print(f"Schema: {poll_result.schema}")
+
+ except Exception as e:
+ print(f"Error during poll: {e}")
+
except Exception as e:
print(f"Error during scanning: {e}")
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 0ae7186..4043350 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -20,10 +20,13 @@ use crate::*;
use arrow::array::RecordBatch;
use arrow_pyarrow::{FromPyArrow, ToPyArrow};
use fluss::client::EARLIEST_OFFSET;
+use fluss::record::to_arrow_schema;
use fluss::rpc::message::OffsetSpec;
use pyo3::types::IntoPyDict;
use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
use std::sync::Arc;
+use std::time::Duration;
// Time conversion constants
const MILLIS_PER_SECOND: i64 = 1_000;
@@ -186,7 +189,7 @@ impl FlussTable {
}
let rust_scanner = table_scan
- .create_log_scanner()
+ .create_record_batch_log_scanner()
.map_err(|e| FlussError::new_err(format!("Failed to create log
scanner: {e}")))?;
let admin = conn
@@ -886,7 +889,7 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
/// Scanner for reading log data from a Fluss table
#[pyclass]
pub struct LogScanner {
- inner: fcore::client::LogScanner,
+ inner: fcore::client::RecordBatchLogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
#[allow(dead_code)]
@@ -931,63 +934,78 @@ impl LogScanner {
/// Convert all data to Arrow Table
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
- use std::collections::HashMap;
- use std::time::Duration;
-
let mut all_batches = Vec::new();
let num_buckets = self.table_info.get_num_buckets();
let bucket_ids: Vec<i32> = (0..num_buckets).collect();
// todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
- let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
- .block_on(async {
- self.admin
- .list_offsets(
- &self.table_info.table_path,
- bucket_ids.as_slice(),
- OffsetSpec::Latest,
- )
- .await
+ let mut stopping_offsets: HashMap<i32, i64> = py
+ .detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ self.admin
+ .list_offsets(
+ &self.table_info.table_path,
+ bucket_ids.as_slice(),
+ OffsetSpec::Latest,
+ )
+ .await
+ })
})
.map_err(|e| FlussError::new_err(e.to_string()))?;
- if !stopping_offsets.is_empty() {
- loop {
- let batch_result = TOKIO_RUNTIME
- .block_on(async {
self.inner.poll(Duration::from_millis(500)).await });
-
- match batch_result {
- Ok(scan_records) => {
- let mut result_records: Vec<fcore::record::ScanRecord>
= vec![];
- for (bucket, records) in
scan_records.into_records_by_buckets() {
- let stopping_offset =
stopping_offsets.get(&bucket.bucket_id());
-
- if stopping_offset.is_none() {
- // not to include this bucket, skip records
for this bucket
- // since we already reach end offset for this
bucket
- continue;
- }
- if let Some(last_record) = records.last() {
- let offset = last_record.offset();
- result_records.extend(records);
- if offset >= stopping_offset.unwrap() - 1 {
-
stopping_offsets.remove(&bucket.bucket_id());
- }
- }
- }
-
- if !result_records.is_empty() {
- let arrow_batch =
Utils::convert_scan_records_to_arrow(result_records);
- all_batches.extend(arrow_batch);
- }
-
- // we have reach end offsets of all bucket
- if stopping_offsets.is_empty() {
- break;
- }
- }
- Err(e) => return Err(FlussError::new_err(e.to_string())),
+ // Filter out buckets with no records to read (stop_at <= 0)
+ stopping_offsets.retain(|_, &mut v| v > 0);
+
+ while !stopping_offsets.is_empty() {
+ let scan_batches = py
+ .detach(|| {
+ TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(500)).await })
+ })
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ if scan_batches.is_empty() {
+ continue;
+ }
+
+ for scan_batch in scan_batches {
+ let bucket_id = scan_batch.bucket().bucket_id();
+
+ // Check if this bucket is still being tracked; if not, ignore
the batch
+ let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
+ continue;
+ };
+
+ let base_offset = scan_batch.base_offset();
+ let last_offset = scan_batch.last_offset();
+
+ // If the batch starts at or after the stop_at offset, the
bucket is exhausted
+ if base_offset >= stop_at {
+ stopping_offsets.remove(&bucket_id);
+ continue;
+ }
+
+ let batch = if last_offset >= stop_at {
+ // This batch contains the target offset; slice it to keep
only records
+ // where offset < stop_at.
+ let num_to_keep = (stop_at - base_offset) as usize;
+ let b = scan_batch.into_batch();
+
+ // Safety check: ensure we don't attempt to slice more
rows than the batch contains
+ let limit = num_to_keep.min(b.num_rows());
+ b.slice(0, limit)
+ } else {
+ // The entire batch is within the desired range (all
offsets < stop_at)
+ scan_batch.into_batch()
+ };
+
+ all_batches.push(Arc::new(batch));
+
+ // If the batch's last offset reached or passed the inclusive
limit (stop_at - 1),
+ // we are done with this bucket.
+ if last_offset >= stop_at - 1 {
+ stopping_offsets.remove(&bucket_id);
}
}
}
@@ -1004,15 +1022,68 @@ impl LogScanner {
Ok(df)
}
+ /// Poll for new records with the specified timeout
+ ///
+ /// Args:
+ /// timeout_ms: Timeout in milliseconds to wait for records
+ ///
+ /// Returns:
+ /// PyArrow Table containing the polled records
+ ///
+ /// Note:
+ /// - Returns an empty table (with correct schema) if no records are
available
+ /// - When timeout expires, returns an empty table (NOT an error)
+ fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
+ if timeout_ms < 0 {
+ return Err(FlussError::new_err(format!(
+ "timeout_ms must be non-negative, got: {timeout_ms}"
+ )));
+ }
+
+ let timeout = Duration::from_millis(timeout_ms as u64);
+ let scan_batches = py
+ .detach(|| TOKIO_RUNTIME.block_on(async {
self.inner.poll(timeout).await }))
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ // Convert ScanBatch to Arrow batches
+ if scan_batches.is_empty() {
+ return self.create_empty_table(py);
+ }
+
+ let arrow_batches: Vec<_> = scan_batches
+ .into_iter()
+ .map(|scan_batch| Arc::new(scan_batch.into_batch()))
+ .collect();
+
+ Utils::combine_batches_to_table(py, arrow_batches)
+ }
+
+ /// Create an empty PyArrow table with the correct schema
+ fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> {
+ let arrow_schema = to_arrow_schema(self.table_info.get_row_type())
+ .map_err(|e| FlussError::new_err(format!("Failed to get arrow
schema: {e}")))?;
+ let py_schema = arrow_schema
+ .as_ref()
+ .to_pyarrow(py)
+ .map_err(|e| FlussError::new_err(format!("Failed to convert
schema: {e}")))?;
+
+ let pyarrow = py.import("pyarrow")?;
+ let empty_table = pyarrow
+ .getattr("Table")?
+ .call_method1("from_batches", (vec![] as Vec<Py<PyAny>>,
py_schema))?;
+
+ Ok(empty_table.into())
+ }
+
fn __repr__(&self) -> String {
format!("LogScanner(table={})", self.table_info.table_path)
}
}
impl LogScanner {
- /// Create LogScanner from core LogScanner
+ /// Create LogScanner from core RecordBatchLogScanner
pub fn from_core(
- inner_scanner: fcore::client::LogScanner,
+ inner_scanner: fcore::client::RecordBatchLogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
) -> Self {