This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0e50dc2 feat: Support different poll methods (#246)
0e50dc2 is described below
commit 0e50dc21220a55bb5d186ee9c9347dc20e8bfc12
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Feb 5 22:27:58 2026 +0000
feat: Support different poll methods (#246)
---
bindings/python/example/example.py | 92 +++++-
bindings/python/fluss/__init__.pyi | 244 ++++++++++++++-
bindings/python/src/lib.rs | 4 +
bindings/python/src/metadata.rs | 51 +++
bindings/python/src/table.rs | 620 ++++++++++++++++++++++++++++++-------
bindings/python/src/utils.rs | 17 +-
6 files changed, 885 insertions(+), 143 deletions(-)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index c359425..9cb8f43 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -236,31 +236,32 @@ async def main():
print(f"Error during writing: {e}")
# Now scan the table to verify data was written
- print("\n--- Scanning table ---")
+ print("\n--- Scanning table (batch scanner) ---")
try:
- log_scanner = await table.new_log_scanner()
- print(f"Created log scanner: {log_scanner}")
+ # Use new_scan().create_batch_scanner() for batch-based operations
+ batch_scanner = await table.new_scan().create_batch_scanner()
+ print(f"Created batch scanner: {batch_scanner}")
# Subscribe to scan from earliest to latest
# start_timestamp=None (earliest), end_timestamp=None (latest)
- log_scanner.subscribe(None, None)
+ batch_scanner.subscribe(None, None)
print("Scanning results using to_arrow():")
# Try to get as PyArrow Table
try:
- pa_table_result = log_scanner.to_arrow()
+ pa_table_result = batch_scanner.to_arrow()
print(f"\nAs PyArrow Table: {pa_table_result}")
except Exception as e:
print(f"Could not convert to PyArrow: {e}")
# Let's subscribe from the beginning again.
# Reset subscription
- log_scanner.subscribe(None, None)
+ batch_scanner.subscribe(None, None)
# Try to get as Pandas DataFrame
try:
- df_result = log_scanner.to_pandas()
+ df_result = batch_scanner.to_pandas()
print(f"\nAs Pandas DataFrame:\n{df_result}")
except Exception as e:
print(f"Could not convert to Pandas: {e}")
@@ -270,15 +271,15 @@ async def main():
# TODO: support to_duckdb()
- # Test the new poll() method for incremental reading
- print("\n--- Testing poll() method ---")
+ # Test poll_arrow() method for incremental reading as Arrow Table
+ print("\n--- Testing poll_arrow() method ---")
# Reset subscription to start from the beginning
- log_scanner.subscribe(None, None)
+ batch_scanner.subscribe(None, None)
# Poll with a timeout of 5000ms (5 seconds)
- # Note: poll() returns an empty table (not an error) on timeout
+ # Note: poll_arrow() returns an empty table (not an error) on timeout
try:
- poll_result = log_scanner.poll(5000)
+ poll_result = batch_scanner.poll_arrow(5000)
print(f"Number of rows: {poll_result.num_rows}")
if poll_result.num_rows > 0:
@@ -289,11 +290,58 @@ async def main():
# Empty table still has schema
print(f"Schema: {poll_result.schema}")
+ except Exception as e:
+ print(f"Error during poll_arrow: {e}")
+
+ # Test poll_batches() method for batches with metadata
+ print("\n--- Testing poll_batches() method ---")
+ batch_scanner.subscribe(None, None)
+
+ try:
+ batches = batch_scanner.poll_batches(5000)
+ print(f"Number of batches: {len(batches)}")
+
+ for i, batch in enumerate(batches):
+ print(f" Batch {i}: bucket={batch.bucket}, "
+ f"offsets={batch.base_offset}-{batch.last_offset}, "
+ f"rows={batch.batch.num_rows}")
+
+ except Exception as e:
+ print(f"Error during poll_batches: {e}")
+
+ except Exception as e:
+ print(f"Error during batch scanning: {e}")
+
+ # Test record-based scanning with poll()
+ print("\n--- Scanning table (record scanner) ---")
+ try:
+ # Use new_scan().create_log_scanner() for record-based operations
+ record_scanner = await table.new_scan().create_log_scanner()
+ print(f"Created record scanner: {record_scanner}")
+
+ record_scanner.subscribe(None, None)
+
+ # Poll returns List[ScanRecord] with per-record metadata
+ print("\n--- Testing poll() method (record-by-record) ---")
+ try:
+ records = record_scanner.poll(5000)
+ print(f"Number of records: {len(records)}")
+
+ # Show first few records with metadata
+ for i, record in enumerate(records[:5]):
+ print(f" Record {i}: offset={record.offset}, "
+ f"timestamp={record.timestamp}, "
+ f"change_type={record.change_type}, "
+ f"row={record.row}")
+
+ if len(records) > 5:
+ print(f" ... and {len(records) - 5} more records")
+
except Exception as e:
print(f"Error during poll: {e}")
except Exception as e:
- print(f"Error during scanning: {e}")
+ print(f"Error during record scanning: {e}")
# =====================================================
# Demo: Primary Key Table with Lookup and Upsert
@@ -488,12 +536,12 @@ async def main():
print(f"Error during delete: {e}")
traceback.print_exc()
- # Demo: Column projection
+ # Demo: Column projection using builder pattern
print("\n--- Testing Column Projection ---")
try:
- # Project specific columns by index
+ # Project specific columns by index (using batch scanner for to_pandas)
print("\n1. Projection by index [0, 1] (id, name):")
- scanner_index = await table.new_log_scanner(project=[0, 1])
+ scanner_index = await table.new_scan().project([0,
1]).create_batch_scanner()
scanner_index.subscribe(None, None)
df_projected = scanner_index.to_pandas()
print(df_projected.head())
@@ -503,12 +551,22 @@ async def main():
# Project specific columns by name (Pythonic!)
print("\n2. Projection by name ['name', 'score'] (Pythonic):")
- scanner_names = await table.new_log_scanner(columns=["name", "score"])
+ scanner_names = await table.new_scan() \
+ .project_by_name(["name", "score"]) \
+ .create_batch_scanner()
scanner_names.subscribe(None, None)
df_named = scanner_names.to_pandas()
print(df_named.head())
print(f" Projected {df_named.shape[1]} columns:
{list(df_named.columns)}")
+ # Test empty result schema with projection
+ print("\n3. Testing empty result schema with projection:")
+ scanner_proj = await table.new_scan().project([0,
2]).create_batch_scanner()
+ scanner_proj.subscribe(None, None)
+ # Quick poll that may return empty
+ result = scanner_proj.poll_arrow(100)
+ print(f" Schema columns: {result.schema.names}")
+
except Exception as e:
print(f"Error during projection: {e}")
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index c911ebe..40d18f6 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -17,12 +17,79 @@
"""Type stubs for Fluss Python bindings."""
+from enum import IntEnum
from types import TracebackType
from typing import Dict, List, Optional, Tuple
import pandas as pd
import pyarrow as pa
+class ChangeType(IntEnum):
+ """Represents the type of change for a record in a log."""
+
+ AppendOnly = 0
+ """Append-only operation"""
+ Insert = 1
+ """Insert operation"""
+ UpdateBefore = 2
+ """Update operation containing the previous content of the updated row"""
+ UpdateAfter = 3
+ """Update operation containing the new content of the updated row"""
+ Delete = 4
+ """Delete operation"""
+
+ def short_string(self) -> str:
+ """Returns a short string representation (+A, +I, -U, +U, -D)."""
+ ...
+
+class ScanRecord:
+ """Represents a single scan record with metadata."""
+
+ @property
+ def bucket(self) -> TableBucket:
+ """The bucket this record belongs to."""
+ ...
+ @property
+ def offset(self) -> int:
+ """The position of this record in the log."""
+ ...
+ @property
+ def timestamp(self) -> int:
+ """The timestamp of this record."""
+ ...
+ @property
+ def change_type(self) -> ChangeType:
+ """The type of change (insert, update, delete, etc.)."""
+ ...
+ @property
+ def row(self) -> Dict[str, object]:
+ """The row data as a dictionary mapping column names to values."""
+ ...
+ def __str__(self) -> str: ...
+ def __repr__(self) -> str: ...
+
+class RecordBatch:
+ """Represents a batch of records with metadata."""
+
+ @property
+ def batch(self) -> pa.RecordBatch:
+ """The Arrow RecordBatch containing the data."""
+ ...
+ @property
+ def bucket(self) -> TableBucket:
+ """The bucket this batch belongs to."""
+ ...
+ @property
+ def base_offset(self) -> int:
+ """The offset of the first record in this batch."""
+ ...
+ @property
+ def last_offset(self) -> int:
+ """The offset of the last record in this batch."""
+ ...
+ def __str__(self) -> str: ...
+ def __repr__(self) -> str: ...
+
class Config:
def __init__(self, properties: Optional[Dict[str, str]] = None) -> None:
...
@property
@@ -64,13 +131,92 @@ class FlussAdmin:
async def get_latest_lake_snapshot(self, table_path: TablePath) ->
LakeSnapshot: ...
def __repr__(self) -> str: ...
+class TableScan:
+ """Builder for creating log scanners with flexible configuration.
+
+ Use this builder to configure projection before creating a log scanner.
+ Obtain a TableScan instance via `FlussTable.new_scan()`.
+
+ Example:
+ ```python
+ # Record-based scanning with projection
+ scanner = await table.new_scan() \\
+ .project([0, 1, 2]) \\
+ .create_log_scanner()
+
+ # Batch-based scanning with column names
+ scanner = await table.new_scan() \\
+ .project_by_name(["id", "name"]) \\
+ .create_batch_scanner()
+ ```
+ """
+
+ def project(self, indices: List[int]) -> "TableScan":
+ """Project to specific columns by their indices.
+
+ Args:
+ indices: List of column indices (0-based) to include in the scan.
+
+ Returns:
+ Self for method chaining.
+ """
+ ...
+ def project_by_name(self, names: List[str]) -> "TableScan":
+ """Project to specific columns by their names.
+
+ Args:
+ names: List of column names to include in the scan.
+
+ Returns:
+ Self for method chaining.
+ """
+ ...
+ async def create_log_scanner(self) -> LogScanner:
+ """Create a record-based log scanner.
+
+ Use this scanner with `poll()` to get individual records with metadata
+ (offset, timestamp, change_type).
+
+ Returns:
+ LogScanner for record-by-record scanning with `poll()`
+ """
+ ...
+ async def create_batch_scanner(self) -> LogScanner:
+ """Create a batch-based log scanner.
+
+ Use this scanner with `poll_arrow()` to get Arrow Tables, or with
+ `poll_batches()` to get individual batches with metadata.
+
+ Returns:
+ LogScanner for batch-based scanning with `poll_arrow()` or
`poll_batches()`
+ """
+ ...
+ def __repr__(self) -> str: ...
+
class FlussTable:
+ def new_scan(self) -> TableScan:
+ """Create a new table scan builder for configuring and creating log
scanners.
+
+ Use this method to create scanners with the builder pattern:
+
+ Example:
+ ```python
+ # Record-based scanning
+ scanner = await table.new_scan() \\
+ .project([0, 1]) \\
+ .create_log_scanner()
+
+ # Batch-based scanning
+ scanner = await table.new_scan() \\
+ .project_by_name(["id", "name"]) \\
+ .create_batch_scanner()
+ ```
+
+ Returns:
+ TableScan builder for configuring the scanner.
+ """
+ ...
async def new_append_writer(self) -> AppendWriter: ...
- async def new_log_scanner(
- self,
- project: Optional[List[int]] = None,
- columns: Optional[List[str]] = None,
- ) -> LogScanner: ...
def new_upsert(
self,
columns: Optional[List[str]] = None,
@@ -159,11 +305,93 @@ class Lookuper:
def __repr__(self) -> str: ...
class LogScanner:
+ """Scanner for reading log data from a Fluss table.
+
+ This scanner supports two modes:
+ - Record-based scanning via `poll()` - returns individual records with
metadata
+ - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns
Arrow batches
+
+ Create scanners using the builder pattern:
+ # Record-based scanning
+ scanner = await table.new_scan().create_log_scanner()
+
+ # Batch-based scanning
+ scanner = await table.new_scan().create_batch_scanner()
+
+ # With projection
+ scanner = await table.new_scan().project([0, 1]).create_log_scanner()
+ """
+
def subscribe(
self, start_timestamp: Optional[int], end_timestamp: Optional[int]
- ) -> None: ...
- def to_pandas(self) -> pd.DataFrame: ...
- def to_arrow(self) -> pa.Table: ...
+ ) -> None:
+ """Subscribe to log data with timestamp range.
+
+ Args:
+ start_timestamp: Not yet supported, must be None.
+ end_timestamp: Not yet supported, must be None.
+ """
+ ...
+ def poll(self, timeout_ms: int) -> List[ScanRecord]:
+ """Poll for individual records with metadata.
+
+ Requires a record-based scanner (created with
new_scan().create_log_scanner()).
+
+ Args:
+ timeout_ms: Timeout in milliseconds to wait for records.
+
+ Returns:
+ List of ScanRecord objects, each containing bucket, offset,
timestamp,
+ change_type, and row data as a dictionary.
+
+ Note:
+ Returns an empty list if no records are available or timeout
expires.
+ """
+ ...
+ def poll_batches(self, timeout_ms: int) -> List[RecordBatch]:
+ """Poll for batches with metadata.
+
+ Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
+
+ Args:
+ timeout_ms: Timeout in milliseconds to wait for batches.
+
+ Returns:
+ List of RecordBatch objects, each containing the Arrow batch along
with
+ bucket, base_offset, and last_offset metadata.
+
+ Note:
+ Returns an empty list if no batches are available or timeout
expires.
+ """
+ ...
+ def poll_arrow(self, timeout_ms: int) -> pa.Table:
+ """Poll for records as an Arrow Table.
+
+ Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
+
+ Args:
+ timeout_ms: Timeout in milliseconds to wait for records.
+
+ Returns:
+ PyArrow Table containing the polled records (batches merged).
+
+ Note:
+ Returns an empty table (with correct schema) if no records are
available
+ or timeout expires.
+ """
+ ...
+ def to_pandas(self) -> pd.DataFrame:
+ """Convert all data to Pandas DataFrame.
+
+ Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
+ """
+ ...
+ def to_arrow(self) -> pa.Table:
+ """Convert all data to Arrow Table.
+
+ Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
+ """
+ ...
def __repr__(self) -> str: ...
class Schema:
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 3da0b25..ce063ab 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -58,6 +58,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<TableDescriptor>()?;
m.add_class::<FlussAdmin>()?;
m.add_class::<FlussTable>()?;
+ m.add_class::<TableScan>()?;
m.add_class::<AppendWriter>()?;
m.add_class::<UpsertWriter>()?;
m.add_class::<Lookuper>()?;
@@ -65,6 +66,9 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LogScanner>()?;
m.add_class::<LakeSnapshot>()?;
m.add_class::<TableBucket>()?;
+ m.add_class::<ChangeType>()?;
+ m.add_class::<ScanRecord>()?;
+ m.add_class::<RecordBatch>()?;
// Register exception types
m.add_class::<FlussError>()?;
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index f422696..f39f9d4 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -19,6 +19,57 @@ use crate::*;
use pyo3::types::PyDict;
use std::collections::HashMap;
+/// Represents the type of change for a record in a log
+#[pyclass(eq, eq_int)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum ChangeType {
+ /// Append-only operation
+ AppendOnly = 0,
+ /// Insert operation
+ Insert = 1,
+ /// Update operation containing the previous content of the updated row
+ UpdateBefore = 2,
+ /// Update operation containing the new content of the updated row
+ UpdateAfter = 3,
+ /// Delete operation
+ Delete = 4,
+}
+
+#[pymethods]
+impl ChangeType {
+ /// Returns a short string representation of this ChangeType
+ pub fn short_string(&self) -> &'static str {
+ match self {
+ ChangeType::AppendOnly => "+A",
+ ChangeType::Insert => "+I",
+ ChangeType::UpdateBefore => "-U",
+ ChangeType::UpdateAfter => "+U",
+ ChangeType::Delete => "-D",
+ }
+ }
+
+ fn __str__(&self) -> &'static str {
+ self.short_string()
+ }
+
+ fn __repr__(&self) -> String {
+ format!("ChangeType.{:?}", self)
+ }
+}
+
+impl ChangeType {
+ /// Convert from core ChangeType
+ pub fn from_core(change_type: fcore::record::ChangeType) -> Self {
+ match change_type {
+ fcore::record::ChangeType::AppendOnly => ChangeType::AppendOnly,
+ fcore::record::ChangeType::Insert => ChangeType::Insert,
+ fcore::record::ChangeType::UpdateBefore =>
ChangeType::UpdateBefore,
+ fcore::record::ChangeType::UpdateAfter => ChangeType::UpdateAfter,
+ fcore::record::ChangeType::Delete => ChangeType::Delete,
+ }
+ }
+}
+
/// Represents a table path with database and table name
#[pyclass]
#[derive(Clone)]
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 4554ca1..30c7ce0 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -17,8 +17,9 @@
use crate::TOKIO_RUNTIME;
use crate::*;
-use arrow::array::RecordBatch;
+use arrow::array::RecordBatch as ArrowRecordBatch;
use arrow_pyarrow::{FromPyArrow, ToPyArrow};
+use arrow_schema::SchemaRef;
use fluss::client::EARLIEST_OFFSET;
use fluss::record::to_arrow_schema;
use fluss::rpc::message::OffsetSpec;
@@ -38,6 +39,123 @@ const MICROS_PER_DAY: i64 = 86_400_000_000;
const NANOS_PER_MILLI: i64 = 1_000_000;
const NANOS_PER_MICRO: i64 = 1_000;
+/// Represents a single scan record with metadata
+#[pyclass]
+pub struct ScanRecord {
+ #[pyo3(get)]
+ bucket: TableBucket,
+ #[pyo3(get)]
+ offset: i64,
+ #[pyo3(get)]
+ timestamp: i64,
+ #[pyo3(get)]
+ change_type: ChangeType,
+ /// Store row as a Python dict directly
+ row_dict: Py<pyo3::types::PyDict>,
+}
+
+#[pymethods]
+impl ScanRecord {
+ /// Get the row data as a dictionary
+ #[getter]
+ pub fn row(&self, py: Python) -> Py<pyo3::types::PyDict> {
+ self.row_dict.clone_ref(py)
+ }
+
+ fn __str__(&self) -> String {
+ format!(
+ "ScanRecord(bucket={}, offset={}, timestamp={}, change_type={})",
+ self.bucket.__str__(),
+ self.offset,
+ self.timestamp,
+ self.change_type.short_string()
+ )
+ }
+
+ fn __repr__(&self) -> String {
+ self.__str__()
+ }
+}
+
+impl ScanRecord {
+ /// Create a ScanRecord from core types
+ pub fn from_core(
+ py: Python,
+ bucket: &fcore::metadata::TableBucket,
+ record: &fcore::record::ScanRecord,
+ row_type: &fcore::metadata::RowType,
+ ) -> PyResult<Self> {
+ let fields = row_type.fields();
+ let row = record.row();
+ let dict = pyo3::types::PyDict::new(py);
+
+ for (pos, field) in fields.iter().enumerate() {
+ let value = datum_to_python_value(py, row, pos,
field.data_type())?;
+ dict.set_item(field.name(), value)?;
+ }
+
+ Ok(ScanRecord {
+ bucket: TableBucket::from_core(bucket.clone()),
+ offset: record.offset(),
+ timestamp: record.timestamp(),
+ change_type: ChangeType::from_core(*record.change_type()),
+ row_dict: dict.unbind(),
+ })
+ }
+}
+
+/// Represents a batch of records with metadata
+#[pyclass]
+pub struct RecordBatch {
+ batch: Arc<ArrowRecordBatch>,
+ #[pyo3(get)]
+ bucket: TableBucket,
+ #[pyo3(get)]
+ base_offset: i64,
+ #[pyo3(get)]
+ last_offset: i64,
+}
+
+#[pymethods]
+impl RecordBatch {
+ /// Get the Arrow RecordBatch as PyArrow RecordBatch
+ #[getter]
+ pub fn batch(&self, py: Python) -> PyResult<Py<PyAny>> {
+ let pyarrow_batch = self
+ .batch
+ .as_ref()
+ .to_pyarrow(py)
+ .map_err(|e| FlussError::new_err(format!("Failed to convert batch:
{e}")))?;
+ Ok(pyarrow_batch.unbind())
+ }
+
+ fn __str__(&self) -> String {
+ format!(
+ "RecordBatch(bucket={}, base_offset={}, last_offset={}, rows={})",
+ self.bucket.__str__(),
+ self.base_offset,
+ self.last_offset,
+ self.batch.num_rows()
+ )
+ }
+
+ fn __repr__(&self) -> String {
+ self.__str__()
+ }
+}
+
+impl RecordBatch {
+ /// Create a RecordBatch from core ScanBatch
+ pub fn from_scan_batch(scan_batch: fcore::record::ScanBatch) -> Self {
+ RecordBatch {
+ bucket: TableBucket::from_core(scan_batch.bucket().clone()),
+ base_offset: scan_batch.base_offset(),
+ last_offset: scan_batch.last_offset(),
+ batch: Arc::new(scan_batch.into_batch()),
+ }
+ }
+}
+
/// Represents a Fluss table for data operations
#[pyclass]
pub struct FlussTable {
@@ -48,14 +166,233 @@ pub struct FlussTable {
has_primary_key: bool,
}
+/// Builder for creating log scanners with flexible configuration.
+///
+/// Use this builder to configure projection, and in the future, filters
+/// before creating a log scanner.
+#[pyclass]
+pub struct TableScan {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ projection: Option<ProjectionType>,
+}
+
+/// Scanner type for internal use
+enum ScannerType {
+ Record,
+ Batch,
+}
+
+#[pymethods]
+impl TableScan {
+ /// Project to specific columns by their indices.
+ ///
+ /// Args:
+ /// indices: List of column indices (0-based) to include in the scan.
+ ///
+ /// Returns:
+ /// Self for method chaining.
+ pub fn project(mut slf: PyRefMut<'_, Self>, indices: Vec<usize>) ->
PyRefMut<'_, Self> {
+ slf.projection = Some(ProjectionType::Indices(indices));
+ slf
+ }
+
+ /// Project to specific columns by their names.
+ ///
+ /// Args:
+ /// names: List of column names to include in the scan.
+ ///
+ /// Returns:
+ /// Self for method chaining.
+ pub fn project_by_name(mut slf: PyRefMut<'_, Self>, names: Vec<String>) ->
PyRefMut<'_, Self> {
+ slf.projection = Some(ProjectionType::Names(names));
+ slf
+ }
+
+ /// Create a record-based log scanner.
+ ///
+ /// Use this scanner with `poll()` to get individual records with metadata
+ /// (offset, timestamp, change_type).
+ ///
+ /// Returns:
+ /// LogScanner for record-by-record scanning with `poll()`
+ pub fn create_log_scanner<'py>(&self, py: Python<'py>) ->
PyResult<Bound<'py, PyAny>> {
+ self.create_scanner_internal(py, ScannerType::Record)
+ }
+
+ /// Create a batch-based log scanner.
+ ///
+ /// Use this scanner with `poll_arrow()` to get Arrow Tables, or with
+ /// `poll_batches()` to get individual batches with metadata.
+ ///
+ /// Returns:
+ /// LogScanner for batch-based scanning with `poll_arrow()` or
`poll_batches()`
+ pub fn create_batch_scanner<'py>(&self, py: Python<'py>) ->
PyResult<Bound<'py, PyAny>> {
+ self.create_scanner_internal(py, ScannerType::Batch)
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "TableScan(table={}.{})",
+ self.table_info.table_path.database(),
+ self.table_info.table_path.table()
+ )
+ }
+}
+
+impl TableScan {
+ fn create_scanner_internal<'py>(
+ &self,
+ py: Python<'py>,
+ scanner_type: ScannerType,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+ let projection = self.projection.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info.clone());
+
+ let projection_indices = resolve_projection_indices(&projection,
&table_info)?;
+ let table_scan = apply_projection(fluss_table.new_scan(),
projection)?;
+
+ let admin = conn
+ .get_admin()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let (projected_schema, projected_row_type) =
+ calculate_projected_types(&table_info, projection_indices)?;
+
+ let py_scanner = match scanner_type {
+ ScannerType::Record => {
+ let rust_scanner =
table_scan.create_log_scanner().map_err(|e| {
+ FlussError::new_err(format!("Failed to create log
scanner: {e}"))
+ })?;
+ LogScanner::from_log_scanner(
+ rust_scanner,
+ admin,
+ table_info,
+ projected_schema,
+ projected_row_type,
+ )
+ }
+ ScannerType::Batch => {
+ let rust_scanner =
+
table_scan.create_record_batch_log_scanner().map_err(|e| {
+ FlussError::new_err(format!("Failed to create
batch scanner: {e}"))
+ })?;
+ LogScanner::from_batch_scanner(
+ rust_scanner,
+ admin,
+ table_info,
+ projected_schema,
+ projected_row_type,
+ )
+ }
+ };
+
+ Python::attach(|py| Py::new(py, py_scanner))
+ })
+ }
+}
+
/// Internal enum to represent different projection types
+#[derive(Clone)]
enum ProjectionType {
Indices(Vec<usize>),
Names(Vec<String>),
}
+/// Resolve projection to column indices
+fn resolve_projection_indices(
+ projection: &Option<ProjectionType>,
+ table_info: &fcore::metadata::TableInfo,
+) -> PyResult<Option<Vec<usize>>> {
+ match projection {
+ Some(ProjectionType::Indices(indices)) => Ok(Some(indices.clone())),
+ Some(ProjectionType::Names(names)) => {
+ let schema = table_info.get_schema();
+ let columns = schema.columns();
+ let mut indices = Vec::with_capacity(names.len());
+ for name in names {
+ let idx = columns
+ .iter()
+ .position(|c| c.name() == name)
+ .ok_or_else(|| FlussError::new_err(format!("Column '{}'
not found", name)))?;
+ indices.push(idx);
+ }
+ Ok(Some(indices))
+ }
+ None => Ok(None),
+ }
+}
+
+/// Apply projection to table scan
+fn apply_projection(
+ table_scan: fcore::client::TableScan,
+ projection: Option<ProjectionType>,
+) -> PyResult<fcore::client::TableScan> {
+ match projection {
+ Some(ProjectionType::Indices(indices)) => table_scan
+ .project(&indices)
+ .map_err(|e| FlussError::new_err(format!("Failed to project
columns: {e}"))),
+ Some(ProjectionType::Names(names)) => {
+ let column_name_refs: Vec<&str> = names.iter().map(|s|
s.as_str()).collect();
+ table_scan
+ .project_by_name(&column_name_refs)
+ .map_err(|e| FlussError::new_err(format!("Failed to project
columns: {e}")))
+ }
+ None => Ok(table_scan),
+ }
+}
+
+/// Calculate projected schema and row type from projection indices
+fn calculate_projected_types(
+ table_info: &fcore::metadata::TableInfo,
+ projection_indices: Option<Vec<usize>>,
+) -> PyResult<(SchemaRef, fcore::metadata::RowType)> {
+ let full_schema = to_arrow_schema(table_info.get_row_type())
+ .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema:
{e}")))?;
+ let full_row_type = table_info.get_row_type();
+
+ match projection_indices {
+ Some(indices) => {
+ let arrow_fields: Vec<_> = indices
+ .iter()
+ .map(|&i| full_schema.field(i).clone())
+ .collect();
+ let row_fields: Vec<_> = indices
+ .iter()
+ .map(|&i| full_row_type.fields()[i].clone())
+ .collect();
+ Ok((
+ Arc::new(arrow_schema::Schema::new(arrow_fields)),
+ fcore::metadata::RowType::new(row_fields),
+ ))
+ }
+ None => Ok((full_schema, full_row_type.clone())),
+ }
+}
+
#[pymethods]
impl FlussTable {
+ /// Create a new table scan builder for configuring and creating log
scanners.
+ ///
+ /// Use this method to create scanners with the builder pattern:
+ /// Returns:
+ /// TableScan builder for configuring the scanner.
+ pub fn new_scan(&self) -> TableScan {
+ TableScan {
+ connection: self.connection.clone(),
+ metadata: self.metadata.clone(),
+ table_info: self.table_info.clone(),
+ projection: None,
+ }
+ }
+
/// Create a new append writer for the table
fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
let conn = self.connection.clone();
@@ -79,41 +416,6 @@ impl FlussTable {
})
}
- /// Create a new log scanner for the table.
- ///
- /// Args:
- /// project: Optional list of column indices (0-based) to include in
the scan.
- /// columns: Optional list of column names to include in the scan.
- ///
- /// Returns:
- /// LogScanner, optionally with projection applied
- ///
- /// Note:
- /// Specify only one of 'project' or 'columns'.
- /// If neither is specified, all columns are included.
- /// Rust side will validate the projection parameters.
- ///
- #[pyo3(signature = (project=None, columns=None))]
- pub fn new_log_scanner<'py>(
- &self,
- py: Python<'py>,
- project: Option<Vec<usize>>,
- columns: Option<Vec<String>>,
- ) -> PyResult<Bound<'py, PyAny>> {
- let projection = match (project, columns) {
- (Some(_), Some(_)) => {
- return Err(FlussError::new_err(
- "Specify only one of 'project' or 'columns'".to_string(),
- ));
- }
- (Some(indices), None) => Some(ProjectionType::Indices(indices)),
- (None, Some(names)) => Some(ProjectionType::Names(names)),
- (None, None) => None,
- };
-
- self.create_log_scanner_internal(py, projection)
- }
-
/// Get table information
pub fn get_table_info(&self) -> TableInfo {
TableInfo::from_core(self.table_info.clone())
@@ -219,55 +521,6 @@ impl FlussTable {
has_primary_key,
}
}
-
- /// Internal helper to create log scanner with optional projection
- fn create_log_scanner_internal<'py>(
- &self,
- py: Python<'py>,
- projection: Option<ProjectionType>,
- ) -> PyResult<Bound<'py, PyAny>> {
- let conn = self.connection.clone();
- let metadata = self.metadata.clone();
- let table_info = self.table_info.clone();
-
- future_into_py(py, async move {
- let fluss_table =
- fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
-
- let mut table_scan = fluss_table.new_scan();
-
- // Apply projection if specified
- if let Some(proj) = projection {
- table_scan = match proj {
- ProjectionType::Indices(indices) => {
- table_scan.project(&indices).map_err(|e| {
- FlussError::new_err(format!("Failed to project
columns: {e}"))
- })?
- }
- ProjectionType::Names(names) => {
- // Convert Vec<String> to Vec<&str> for the API
- let column_name_refs: Vec<&str> =
- names.iter().map(|s| s.as_str()).collect();
-
table_scan.project_by_name(&column_name_refs).map_err(|e| {
- FlussError::new_err(format!("Failed to project
columns: {e}"))
- })?
- }
- };
- }
-
- let rust_scanner = table_scan
- .create_record_batch_log_scanner()
- .map_err(|e| FlussError::new_err(format!("Failed to create log
scanner: {e}")))?;
-
- let admin = conn
- .get_admin()
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
-
- let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
- Python::attach(|py| Py::new(py, py_scanner))
- })
- }
}
/// Writer for appending data to a Fluss table
@@ -295,7 +548,7 @@ impl AppendWriter {
pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) ->
PyResult<()> {
// This shares the underlying Arrow buffers without copying data
let batch_bound = batch.bind(py);
- let rust_batch: RecordBatch =
FromPyArrow::from_pyarrow_bound(batch_bound)
+ let rust_batch: ArrowRecordBatch =
FromPyArrow::from_pyarrow_bound(batch_bound)
.map_err(|e| FlussError::new_err(format!("Failed to convert
RecordBatch: {e}")))?;
let inner = self.inner.clone();
@@ -1303,12 +1556,23 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
.unwrap_or_else(|_| "unknown".to_string())
}
-/// Scanner for reading log data from a Fluss table
+/// Scanner for reading log data from a Fluss table.
+///
+/// This scanner supports two modes:
+/// - Record-based scanning via `poll()` - returns individual records with
metadata
+/// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns
Arrow batches
#[pyclass]
pub struct LogScanner {
- inner: fcore::client::RecordBatchLogScanner,
+ /// Record-based scanner for poll()
+ inner: Option<fcore::client::LogScanner>,
+ /// Batch-based scanner for poll_arrow/poll_batches
+ inner_batch: Option<fcore::client::RecordBatchLogScanner>,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
+ /// The projected Arrow schema to use for empty table creation
+ projected_schema: SchemaRef,
+ /// The projected row type to use for record-based scanning
+ projected_row_type: fcore::metadata::RowType,
#[allow(dead_code)]
start_timestamp: Option<i64>,
#[allow(dead_code)]
@@ -1338,19 +1602,40 @@ impl LogScanner {
for bucket_id in 0..num_buckets {
let start_offset = EARLIEST_OFFSET;
- TOKIO_RUNTIME.block_on(async {
- self.inner
- .subscribe(bucket_id, start_offset)
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))
- })?;
+ // Subscribe to the appropriate scanner
+ if let Some(ref inner) = self.inner {
+ TOKIO_RUNTIME.block_on(async {
+ inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ } else if let Some(ref inner_batch) = self.inner_batch {
+ TOKIO_RUNTIME.block_on(async {
+ inner_batch
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ } else {
+ return Err(FlussError::new_err("No scanner available"));
+ }
}
Ok(())
}
/// Convert all data to Arrow Table
+ ///
+ /// Note: Requires a batch-based scanner (created with
new_scan().create_batch_scanner()).
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
+ let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+ FlussError::new_err(
+ "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
+ that supports to_arrow().",
+ )
+ })?;
+
let mut all_batches = Vec::new();
let num_buckets = self.table_info.get_num_buckets();
@@ -1378,7 +1663,7 @@ impl LogScanner {
let scan_batches = py
.detach(|| {
TOKIO_RUNTIME
- .block_on(async {
self.inner.poll(Duration::from_millis(500)).await })
+ .block_on(async {
inner_batch.poll(Duration::from_millis(500)).await })
})
.map_err(|e| FlussError::new_err(e.to_string()))?;
@@ -1439,18 +1724,114 @@ impl LogScanner {
Ok(df)
}
- /// Poll for new records with the specified timeout
+ /// Poll for individual records with metadata.
///
/// Args:
/// timeout_ms: Timeout in milliseconds to wait for records
///
/// Returns:
- /// PyArrow Table containing the polled records
+ /// List of ScanRecord objects, each containing bucket, offset,
timestamp,
+ /// change_type, and row data as a dictionary.
+ ///
+ /// Note:
+ /// - Requires a record-based scanner (created with
new_scan().create_log_scanner())
+ /// - Returns an empty list if no records are available
+ /// - When timeout expires, returns an empty list (NOT an error)
+ fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
+ let inner = self.inner.as_ref().ok_or_else(|| {
+ FlussError::new_err(
+ "Record-based scanner not available. Use
new_scan().create_log_scanner() to create a scanner \
+ that supports poll().",
+ )
+ })?;
+
+ if timeout_ms < 0 {
+ return Err(FlussError::new_err(format!(
+ "timeout_ms must be non-negative, got: {timeout_ms}"
+ )));
+ }
+
+ let timeout = Duration::from_millis(timeout_ms as u64);
+ let scan_records = py
+ .detach(|| TOKIO_RUNTIME.block_on(async {
inner.poll(timeout).await }))
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ // Convert ScanRecords to Python ScanRecord list
+ // Use projected_row_type to handle column projection correctly
+ let row_type = &self.projected_row_type;
+ let mut result = Vec::new();
+
+ for (bucket, records) in scan_records.into_records_by_buckets() {
+ for record in records {
+ let scan_record = ScanRecord::from_core(py, &bucket, &record,
row_type)?;
+ result.push(scan_record);
+ }
+ }
+
+ Ok(result)
+ }
+
+ /// Poll for batches with metadata.
+ ///
+ /// Args:
+ /// timeout_ms: Timeout in milliseconds to wait for batches
+ ///
+ /// Returns:
+ /// List of RecordBatch objects, each containing the Arrow batch along
with
+ /// bucket, base_offset, and last_offset metadata.
///
/// Note:
+ /// - Requires a batch-based scanner (created with
new_scan().create_batch_scanner())
+ /// - Returns an empty list if no batches are available
+ /// - When timeout expires, returns an empty list (NOT an error)
+ fn poll_batches(&self, py: Python, timeout_ms: i64) ->
PyResult<Vec<RecordBatch>> {
+ let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+ FlussError::new_err(
+ "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
+ that supports poll_batches().",
+ )
+ })?;
+
+ if timeout_ms < 0 {
+ return Err(FlussError::new_err(format!(
+ "timeout_ms must be non-negative, got: {timeout_ms}"
+ )));
+ }
+
+ let timeout = Duration::from_millis(timeout_ms as u64);
+ let scan_batches = py
+ .detach(|| TOKIO_RUNTIME.block_on(async {
inner_batch.poll(timeout).await }))
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ // Convert ScanBatch to RecordBatch with metadata
+ let result = scan_batches
+ .into_iter()
+ .map(RecordBatch::from_scan_batch)
+ .collect();
+
+ Ok(result)
+ }
+
+ /// Poll for new records as an Arrow Table.
+ ///
+ /// Args:
+ /// timeout_ms: Timeout in milliseconds to wait for records
+ ///
+ /// Returns:
+ /// PyArrow Table containing the polled records (batches merged)
+ ///
+ /// Note:
+ /// - Requires a batch-based scanner (created with
new_scan().create_batch_scanner())
/// - Returns an empty table (with correct schema) if no records are
available
/// - When timeout expires, returns an empty table (NOT an error)
- fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
+ fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
+ let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+ FlussError::new_err(
+ "Batch-based scanner not available. Use
new_scan().create_batch_scanner() to create a scanner \
+ that supports poll_arrow().",
+ )
+ })?;
+
if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
"timeout_ms must be non-negative, got: {timeout_ms}"
@@ -1459,7 +1840,7 @@ impl LogScanner {
let timeout = Duration::from_millis(timeout_ms as u64);
let scan_batches = py
- .detach(|| TOKIO_RUNTIME.block_on(async {
self.inner.poll(timeout).await }))
+ .detach(|| TOKIO_RUNTIME.block_on(async {
inner_batch.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;
// Convert ScanBatch to Arrow batches
@@ -1475,11 +1856,11 @@ impl LogScanner {
Utils::combine_batches_to_table(py, arrow_batches)
}
- /// Create an empty PyArrow table with the correct schema
+ /// Create an empty PyArrow table with the correct (projected) schema
fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> {
- let arrow_schema = to_arrow_schema(self.table_info.get_row_type())
- .map_err(|e| FlussError::new_err(format!("Failed to get arrow
schema: {e}")))?;
- let py_schema = arrow_schema
+ // Use the projected schema stored in the scanner
+ let py_schema = self
+ .projected_schema
.as_ref()
.to_pyarrow(py)
.map_err(|e| FlussError::new_err(format!("Failed to convert
schema: {e}")))?;
@@ -1498,16 +1879,41 @@ impl LogScanner {
}
impl LogScanner {
- /// Create LogScanner from core RecordBatchLogScanner
- pub fn from_core(
- inner_scanner: fcore::client::RecordBatchLogScanner,
+ /// Create LogScanner for record-based scanning
+ pub fn from_log_scanner(
+ inner_scanner: fcore::client::LogScanner,
+ admin: fcore::client::FlussAdmin,
+ table_info: fcore::metadata::TableInfo,
+ projected_schema: SchemaRef,
+ projected_row_type: fcore::metadata::RowType,
+ ) -> Self {
+ Self {
+ inner: Some(inner_scanner),
+ inner_batch: None,
+ admin,
+ table_info,
+ projected_schema,
+ projected_row_type,
+ start_timestamp: None,
+ end_timestamp: None,
+ }
+ }
+
+ /// Create LogScanner for batch-based scanning
+ pub fn from_batch_scanner(
+ inner_batch_scanner: fcore::client::RecordBatchLogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
+ projected_schema: SchemaRef,
+ projected_row_type: fcore::metadata::RowType,
) -> Self {
Self {
- inner: inner_scanner,
+ inner: None,
+ inner_batch: Some(inner_batch_scanner),
admin,
table_info,
+ projected_schema,
+ projected_row_type,
start_timestamp: None,
end_timestamp: None,
}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index ee32c9c..c92f1b9 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -203,20 +203,15 @@ impl Utils {
py: Python,
batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
) -> PyResult<Py<PyAny>> {
- use arrow_array::RecordBatch as ArrowArrayRecordBatch;
-
let py_batches: Result<Vec<Py<PyAny>>, _> = batches
.iter()
.map(|batch| {
- ArrowArrayRecordBatch::try_new(batch.schema().clone(),
batch.columns().to_vec())
- .map_err(|e| FlussError::new_err(format!("Failed to
convert RecordBatch: {e}")))
- .and_then(|b| {
- ToPyArrow::to_pyarrow(&b, py)
- .map(|x| x.into())
- .map_err(|e| {
- FlussError::new_err(format!("Failed to convert
to PyObject: {e}"))
- })
- })
+ // Just dereference the Arc - no need to recreate the batch
+ batch
+ .as_ref()
+ .to_pyarrow(py)
+ .map(|x| x.into())
+ .map_err(|e| FlussError::new_err(format!("Failed to
convert to PyObject: {e}")))
})
.collect();