This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e50dc2  feat: Support different poll methods (#246)
0e50dc2 is described below

commit 0e50dc21220a55bb5d186ee9c9347dc20e8bfc12
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Feb 5 22:27:58 2026 +0000

    feat: Support different poll methods (#246)
---
 bindings/python/example/example.py |  92 +++++-
 bindings/python/fluss/__init__.pyi | 244 ++++++++++++++-
 bindings/python/src/lib.rs         |   4 +
 bindings/python/src/metadata.rs    |  51 +++
 bindings/python/src/table.rs       | 620 ++++++++++++++++++++++++++++++-------
 bindings/python/src/utils.rs       |  17 +-
 6 files changed, 885 insertions(+), 143 deletions(-)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index c359425..9cb8f43 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -236,31 +236,32 @@ async def main():
         print(f"Error during writing: {e}")
 
     # Now scan the table to verify data was written
-    print("\n--- Scanning table ---")
+    print("\n--- Scanning table (batch scanner) ---")
     try:
-        log_scanner = await table.new_log_scanner()
-        print(f"Created log scanner: {log_scanner}")
+        # Use new_scan().create_batch_scanner() for batch-based operations
+        batch_scanner = await table.new_scan().create_batch_scanner()
+        print(f"Created batch scanner: {batch_scanner}")
 
         # Subscribe to scan from earliest to latest
         # start_timestamp=None (earliest), end_timestamp=None (latest)
-        log_scanner.subscribe(None, None)
+        batch_scanner.subscribe(None, None)
 
         print("Scanning results using to_arrow():")
 
         # Try to get as PyArrow Table
         try:
-            pa_table_result = log_scanner.to_arrow()
+            pa_table_result = batch_scanner.to_arrow()
             print(f"\nAs PyArrow Table: {pa_table_result}")
         except Exception as e:
             print(f"Could not convert to PyArrow: {e}")
 
         # Let's subscribe from the beginning again.
         # Reset subscription
-        log_scanner.subscribe(None, None)
+        batch_scanner.subscribe(None, None)
 
         # Try to get as Pandas DataFrame
         try:
-            df_result = log_scanner.to_pandas()
+            df_result = batch_scanner.to_pandas()
             print(f"\nAs Pandas DataFrame:\n{df_result}")
         except Exception as e:
             print(f"Could not convert to Pandas: {e}")
@@ -270,15 +271,15 @@ async def main():
 
         # TODO: support to_duckdb()
 
-        # Test the new poll() method for incremental reading
-        print("\n--- Testing poll() method ---")
+        # Test poll_arrow() method for incremental reading as Arrow Table
+        print("\n--- Testing poll_arrow() method ---")
         # Reset subscription to start from the beginning
-        log_scanner.subscribe(None, None)
+        batch_scanner.subscribe(None, None)
 
         # Poll with a timeout of 5000ms (5 seconds)
-        # Note: poll() returns an empty table (not an error) on timeout
+        # Note: poll_arrow() returns an empty table (not an error) on timeout
         try:
-            poll_result = log_scanner.poll(5000)
+            poll_result = batch_scanner.poll_arrow(5000)
             print(f"Number of rows: {poll_result.num_rows}")
 
             if poll_result.num_rows > 0:
@@ -289,11 +290,58 @@ async def main():
                 # Empty table still has schema
                 print(f"Schema: {poll_result.schema}")
 
+        except Exception as e:
+            print(f"Error during poll_arrow: {e}")
+
+        # Test poll_batches() method for batches with metadata
+        print("\n--- Testing poll_batches() method ---")
+        batch_scanner.subscribe(None, None)
+
+        try:
+            batches = batch_scanner.poll_batches(5000)
+            print(f"Number of batches: {len(batches)}")
+
+            for i, batch in enumerate(batches):
+                print(f"  Batch {i}: bucket={batch.bucket}, "
+                      f"offsets={batch.base_offset}-{batch.last_offset}, "
+                      f"rows={batch.batch.num_rows}")
+
+        except Exception as e:
+            print(f"Error during poll_batches: {e}")
+
+    except Exception as e:
+        print(f"Error during batch scanning: {e}")
+
+    # Test record-based scanning with poll()
+    print("\n--- Scanning table (record scanner) ---")
+    try:
+        # Use new_scan().create_log_scanner() for record-based operations
+        record_scanner = await table.new_scan().create_log_scanner()
+        print(f"Created record scanner: {record_scanner}")
+
+        record_scanner.subscribe(None, None)
+
+        # Poll returns List[ScanRecord] with per-record metadata
+        print("\n--- Testing poll() method (record-by-record) ---")
+        try:
+            records = record_scanner.poll(5000)
+            print(f"Number of records: {len(records)}")
+
+            # Show first few records with metadata
+            for i, record in enumerate(records[:5]):
+                print(f"  Record {i}: offset={record.offset}, "
+                      f"timestamp={record.timestamp}, "
+                      f"change_type={record.change_type}, "
+                      f"row={record.row}")
+
+            if len(records) > 5:
+                print(f"  ... and {len(records) - 5} more records")
+
         except Exception as e:
             print(f"Error during poll: {e}")
 
     except Exception as e:
-        print(f"Error during scanning: {e}")
+        print(f"Error during record scanning: {e}")
 
     # =====================================================
     # Demo: Primary Key Table with Lookup and Upsert
@@ -488,12 +536,12 @@ async def main():
         print(f"Error during delete: {e}")
         traceback.print_exc()
 
-    # Demo: Column projection
+    # Demo: Column projection using builder pattern
     print("\n--- Testing Column Projection ---")
     try:
-        # Project specific columns by index
+        # Project specific columns by index (using batch scanner for to_pandas)
         print("\n1. Projection by index [0, 1] (id, name):")
-        scanner_index = await table.new_log_scanner(project=[0, 1])
+        scanner_index = await table.new_scan().project([0, 
1]).create_batch_scanner()
         scanner_index.subscribe(None, None)
         df_projected = scanner_index.to_pandas()
         print(df_projected.head())
@@ -503,12 +551,22 @@ async def main():
 
         # Project specific columns by name (Pythonic!)
         print("\n2. Projection by name ['name', 'score'] (Pythonic):")
-        scanner_names = await table.new_log_scanner(columns=["name", "score"])
+        scanner_names = await table.new_scan() \
+            .project_by_name(["name", "score"]) \
+            .create_batch_scanner()
         scanner_names.subscribe(None, None)
         df_named = scanner_names.to_pandas()
         print(df_named.head())
         print(f"   Projected {df_named.shape[1]} columns: 
{list(df_named.columns)}")
 
+        # Test empty result schema with projection
+        print("\n3. Testing empty result schema with projection:")
+        scanner_proj = await table.new_scan().project([0, 
2]).create_batch_scanner()
+        scanner_proj.subscribe(None, None)
+        # Quick poll that may return empty
+        result = scanner_proj.poll_arrow(100)
+        print(f"   Schema columns: {result.schema.names}")
+
     except Exception as e:
         print(f"Error during projection: {e}")
 
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index c911ebe..40d18f6 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -17,12 +17,79 @@
 
 """Type stubs for Fluss Python bindings."""
 
+from enum import IntEnum
 from types import TracebackType
 from typing import Dict, List, Optional, Tuple
 
 import pandas as pd
 import pyarrow as pa
 
+class ChangeType(IntEnum):
+    """Represents the type of change for a record in a log."""
+
+    AppendOnly = 0
+    """Append-only operation"""
+    Insert = 1
+    """Insert operation"""
+    UpdateBefore = 2
+    """Update operation containing the previous content of the updated row"""
+    UpdateAfter = 3
+    """Update operation containing the new content of the updated row"""
+    Delete = 4
+    """Delete operation"""
+
+    def short_string(self) -> str:
+        """Returns a short string representation (+A, +I, -U, +U, -D)."""
+        ...
+
+class ScanRecord:
+    """Represents a single scan record with metadata."""
+
+    @property
+    def bucket(self) -> TableBucket:
+        """The bucket this record belongs to."""
+        ...
+    @property
+    def offset(self) -> int:
+        """The position of this record in the log."""
+        ...
+    @property
+    def timestamp(self) -> int:
+        """The timestamp of this record."""
+        ...
+    @property
+    def change_type(self) -> ChangeType:
+        """The type of change (insert, update, delete, etc.)."""
+        ...
+    @property
+    def row(self) -> Dict[str, object]:
+        """The row data as a dictionary mapping column names to values."""
+        ...
+    def __str__(self) -> str: ...
+    def __repr__(self) -> str: ...
+
+class RecordBatch:
+    """Represents a batch of records with metadata."""
+
+    @property
+    def batch(self) -> pa.RecordBatch:
+        """The Arrow RecordBatch containing the data."""
+        ...
+    @property
+    def bucket(self) -> TableBucket:
+        """The bucket this batch belongs to."""
+        ...
+    @property
+    def base_offset(self) -> int:
+        """The offset of the first record in this batch."""
+        ...
+    @property
+    def last_offset(self) -> int:
+        """The offset of the last record in this batch."""
+        ...
+    def __str__(self) -> str: ...
+    def __repr__(self) -> str: ...
+
 class Config:
     def __init__(self, properties: Optional[Dict[str, str]] = None) -> None: 
...
     @property
@@ -64,13 +131,92 @@ class FlussAdmin:
     async def get_latest_lake_snapshot(self, table_path: TablePath) -> 
LakeSnapshot: ...
     def __repr__(self) -> str: ...
 
+class TableScan:
+    """Builder for creating log scanners with flexible configuration.
+
+    Use this builder to configure projection before creating a log scanner.
+    Obtain a TableScan instance via `FlussTable.new_scan()`.
+
+    Example:
+        ```python
+        # Record-based scanning with projection
+        scanner = await table.new_scan() \\
+            .project([0, 1, 2]) \\
+            .create_log_scanner()
+
+        # Batch-based scanning with column names
+        scanner = await table.new_scan() \\
+            .project_by_name(["id", "name"]) \\
+            .create_batch_scanner()
+        ```
+    """
+
+    def project(self, indices: List[int]) -> "TableScan":
+        """Project to specific columns by their indices.
+
+        Args:
+            indices: List of column indices (0-based) to include in the scan.
+
+        Returns:
+            Self for method chaining.
+        """
+        ...
+    def project_by_name(self, names: List[str]) -> "TableScan":
+        """Project to specific columns by their names.
+
+        Args:
+            names: List of column names to include in the scan.
+
+        Returns:
+            Self for method chaining.
+        """
+        ...
+    async def create_log_scanner(self) -> LogScanner:
+        """Create a record-based log scanner.
+
+        Use this scanner with `poll()` to get individual records with metadata
+        (offset, timestamp, change_type).
+
+        Returns:
+            LogScanner for record-by-record scanning with `poll()`
+        """
+        ...
+    async def create_batch_scanner(self) -> LogScanner:
+        """Create a batch-based log scanner.
+
+        Use this scanner with `poll_arrow()` to get Arrow Tables, or with
+        `poll_batches()` to get individual batches with metadata.
+
+        Returns:
+            LogScanner for batch-based scanning with `poll_arrow()` or 
`poll_batches()`
+        """
+        ...
+    def __repr__(self) -> str: ...
+
 class FlussTable:
+    def new_scan(self) -> TableScan:
+        """Create a new table scan builder for configuring and creating log 
scanners.
+
+        Use this method to create scanners with the builder pattern:
+
+        Example:
+            ```python
+            # Record-based scanning
+            scanner = await table.new_scan() \\
+                .project([0, 1]) \\
+                .create_log_scanner()
+
+            # Batch-based scanning
+            scanner = await table.new_scan() \\
+                .project_by_name(["id", "name"]) \\
+                .create_batch_scanner()
+            ```
+
+        Returns:
+            TableScan builder for configuring the scanner.
+        """
+        ...
     async def new_append_writer(self) -> AppendWriter: ...
-    async def new_log_scanner(
-        self,
-        project: Optional[List[int]] = None,
-        columns: Optional[List[str]] = None,
-    ) -> LogScanner: ...
     def new_upsert(
         self,
         columns: Optional[List[str]] = None,
@@ -159,11 +305,93 @@ class Lookuper:
     def __repr__(self) -> str: ...
 
 class LogScanner:
+    """Scanner for reading log data from a Fluss table.
+
+    This scanner supports two modes:
+    - Record-based scanning via `poll()` - returns individual records with 
metadata
+    - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns 
Arrow batches
+
+    Create scanners using the builder pattern:
+        # Record-based scanning
+        scanner = await table.new_scan().create_log_scanner()
+
+        # Batch-based scanning
+        scanner = await table.new_scan().create_batch_scanner()
+
+        # With projection
+        scanner = await table.new_scan().project([0, 1]).create_log_scanner()
+    """
+
     def subscribe(
         self, start_timestamp: Optional[int], end_timestamp: Optional[int]
-    ) -> None: ...
-    def to_pandas(self) -> pd.DataFrame: ...
-    def to_arrow(self) -> pa.Table: ...
+    ) -> None:
+        """Subscribe to log data with timestamp range.
+
+        Args:
+            start_timestamp: Not yet supported, must be None.
+            end_timestamp: Not yet supported, must be None.
+        """
+        ...
+    def poll(self, timeout_ms: int) -> List[ScanRecord]:
+        """Poll for individual records with metadata.
+
+        Requires a record-based scanner (created with 
new_scan().create_log_scanner()).
+
+        Args:
+            timeout_ms: Timeout in milliseconds to wait for records.
+
+        Returns:
+            List of ScanRecord objects, each containing bucket, offset, 
timestamp,
+            change_type, and row data as a dictionary.
+
+        Note:
+            Returns an empty list if no records are available or timeout 
expires.
+        """
+        ...
+    def poll_batches(self, timeout_ms: int) -> List[RecordBatch]:
+        """Poll for batches with metadata.
+
+        Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
+
+        Args:
+            timeout_ms: Timeout in milliseconds to wait for batches.
+
+        Returns:
+            List of RecordBatch objects, each containing the Arrow batch along 
with
+            bucket, base_offset, and last_offset metadata.
+
+        Note:
+            Returns an empty list if no batches are available or timeout 
expires.
+        """
+        ...
+    def poll_arrow(self, timeout_ms: int) -> pa.Table:
+        """Poll for records as an Arrow Table.
+
+        Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
+
+        Args:
+            timeout_ms: Timeout in milliseconds to wait for records.
+
+        Returns:
+            PyArrow Table containing the polled records (batches merged).
+
+        Note:
+            Returns an empty table (with correct schema) if no records are 
available
+            or timeout expires.
+        """
+        ...
+    def to_pandas(self) -> pd.DataFrame:
+        """Convert all data to Pandas DataFrame.
+
+        Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
+        """
+        ...
+    def to_arrow(self) -> pa.Table:
+        """Convert all data to Arrow Table.
+
+        Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
+        """
+        ...
     def __repr__(self) -> str: ...
 
 class Schema:
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 3da0b25..ce063ab 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -58,6 +58,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<TableDescriptor>()?;
     m.add_class::<FlussAdmin>()?;
     m.add_class::<FlussTable>()?;
+    m.add_class::<TableScan>()?;
     m.add_class::<AppendWriter>()?;
     m.add_class::<UpsertWriter>()?;
     m.add_class::<Lookuper>()?;
@@ -65,6 +66,9 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<LogScanner>()?;
     m.add_class::<LakeSnapshot>()?;
     m.add_class::<TableBucket>()?;
+    m.add_class::<ChangeType>()?;
+    m.add_class::<ScanRecord>()?;
+    m.add_class::<RecordBatch>()?;
 
     // Register exception types
     m.add_class::<FlussError>()?;
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index f422696..f39f9d4 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -19,6 +19,57 @@ use crate::*;
 use pyo3::types::PyDict;
 use std::collections::HashMap;
 
+/// Represents the type of change for a record in a log
+#[pyclass(eq, eq_int)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum ChangeType {
+    /// Append-only operation
+    AppendOnly = 0,
+    /// Insert operation
+    Insert = 1,
+    /// Update operation containing the previous content of the updated row
+    UpdateBefore = 2,
+    /// Update operation containing the new content of the updated row
+    UpdateAfter = 3,
+    /// Delete operation
+    Delete = 4,
+}
+
+#[pymethods]
+impl ChangeType {
+    /// Returns a short string representation of this ChangeType
+    pub fn short_string(&self) -> &'static str {
+        match self {
+            ChangeType::AppendOnly => "+A",
+            ChangeType::Insert => "+I",
+            ChangeType::UpdateBefore => "-U",
+            ChangeType::UpdateAfter => "+U",
+            ChangeType::Delete => "-D",
+        }
+    }
+
+    fn __str__(&self) -> &'static str {
+        self.short_string()
+    }
+
+    fn __repr__(&self) -> String {
+        format!("ChangeType.{:?}", self)
+    }
+}
+
+impl ChangeType {
+    /// Convert from core ChangeType
+    pub fn from_core(change_type: fcore::record::ChangeType) -> Self {
+        match change_type {
+            fcore::record::ChangeType::AppendOnly => ChangeType::AppendOnly,
+            fcore::record::ChangeType::Insert => ChangeType::Insert,
+            fcore::record::ChangeType::UpdateBefore => 
ChangeType::UpdateBefore,
+            fcore::record::ChangeType::UpdateAfter => ChangeType::UpdateAfter,
+            fcore::record::ChangeType::Delete => ChangeType::Delete,
+        }
+    }
+}
+
 /// Represents a table path with database and table name
 #[pyclass]
 #[derive(Clone)]
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 4554ca1..30c7ce0 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -17,8 +17,9 @@
 
 use crate::TOKIO_RUNTIME;
 use crate::*;
-use arrow::array::RecordBatch;
+use arrow::array::RecordBatch as ArrowRecordBatch;
 use arrow_pyarrow::{FromPyArrow, ToPyArrow};
+use arrow_schema::SchemaRef;
 use fluss::client::EARLIEST_OFFSET;
 use fluss::record::to_arrow_schema;
 use fluss::rpc::message::OffsetSpec;
@@ -38,6 +39,123 @@ const MICROS_PER_DAY: i64 = 86_400_000_000;
 const NANOS_PER_MILLI: i64 = 1_000_000;
 const NANOS_PER_MICRO: i64 = 1_000;
 
+/// Represents a single scan record with metadata
+#[pyclass]
+pub struct ScanRecord {
+    #[pyo3(get)]
+    bucket: TableBucket,
+    #[pyo3(get)]
+    offset: i64,
+    #[pyo3(get)]
+    timestamp: i64,
+    #[pyo3(get)]
+    change_type: ChangeType,
+    /// Store row as a Python dict directly
+    row_dict: Py<pyo3::types::PyDict>,
+}
+
+#[pymethods]
+impl ScanRecord {
+    /// Get the row data as a dictionary
+    #[getter]
+    pub fn row(&self, py: Python) -> Py<pyo3::types::PyDict> {
+        self.row_dict.clone_ref(py)
+    }
+
+    fn __str__(&self) -> String {
+        format!(
+            "ScanRecord(bucket={}, offset={}, timestamp={}, change_type={})",
+            self.bucket.__str__(),
+            self.offset,
+            self.timestamp,
+            self.change_type.short_string()
+        )
+    }
+
+    fn __repr__(&self) -> String {
+        self.__str__()
+    }
+}
+
+impl ScanRecord {
+    /// Create a ScanRecord from core types
+    pub fn from_core(
+        py: Python,
+        bucket: &fcore::metadata::TableBucket,
+        record: &fcore::record::ScanRecord,
+        row_type: &fcore::metadata::RowType,
+    ) -> PyResult<Self> {
+        let fields = row_type.fields();
+        let row = record.row();
+        let dict = pyo3::types::PyDict::new(py);
+
+        for (pos, field) in fields.iter().enumerate() {
+            let value = datum_to_python_value(py, row, pos, 
field.data_type())?;
+            dict.set_item(field.name(), value)?;
+        }
+
+        Ok(ScanRecord {
+            bucket: TableBucket::from_core(bucket.clone()),
+            offset: record.offset(),
+            timestamp: record.timestamp(),
+            change_type: ChangeType::from_core(*record.change_type()),
+            row_dict: dict.unbind(),
+        })
+    }
+}
+
+/// Represents a batch of records with metadata
+#[pyclass]
+pub struct RecordBatch {
+    batch: Arc<ArrowRecordBatch>,
+    #[pyo3(get)]
+    bucket: TableBucket,
+    #[pyo3(get)]
+    base_offset: i64,
+    #[pyo3(get)]
+    last_offset: i64,
+}
+
+#[pymethods]
+impl RecordBatch {
+    /// Get the Arrow RecordBatch as PyArrow RecordBatch
+    #[getter]
+    pub fn batch(&self, py: Python) -> PyResult<Py<PyAny>> {
+        let pyarrow_batch = self
+            .batch
+            .as_ref()
+            .to_pyarrow(py)
+            .map_err(|e| FlussError::new_err(format!("Failed to convert batch: 
{e}")))?;
+        Ok(pyarrow_batch.unbind())
+    }
+
+    fn __str__(&self) -> String {
+        format!(
+            "RecordBatch(bucket={}, base_offset={}, last_offset={}, rows={})",
+            self.bucket.__str__(),
+            self.base_offset,
+            self.last_offset,
+            self.batch.num_rows()
+        )
+    }
+
+    fn __repr__(&self) -> String {
+        self.__str__()
+    }
+}
+
+impl RecordBatch {
+    /// Create a RecordBatch from core ScanBatch
+    pub fn from_scan_batch(scan_batch: fcore::record::ScanBatch) -> Self {
+        RecordBatch {
+            bucket: TableBucket::from_core(scan_batch.bucket().clone()),
+            base_offset: scan_batch.base_offset(),
+            last_offset: scan_batch.last_offset(),
+            batch: Arc::new(scan_batch.into_batch()),
+        }
+    }
+}
+
 /// Represents a Fluss table for data operations
 #[pyclass]
 pub struct FlussTable {
@@ -48,14 +166,233 @@ pub struct FlussTable {
     has_primary_key: bool,
 }
 
+/// Builder for creating log scanners with flexible configuration.
+///
+/// Use this builder to configure projection, and in the future, filters
+/// before creating a log scanner.
+#[pyclass]
+pub struct TableScan {
+    connection: Arc<fcore::client::FlussConnection>,
+    metadata: Arc<fcore::client::Metadata>,
+    table_info: fcore::metadata::TableInfo,
+    projection: Option<ProjectionType>,
+}
+
+/// Scanner type for internal use
+enum ScannerType {
+    Record,
+    Batch,
+}
+
+#[pymethods]
+impl TableScan {
+    /// Project to specific columns by their indices.
+    ///
+    /// Args:
+    ///     indices: List of column indices (0-based) to include in the scan.
+    ///
+    /// Returns:
+    ///     Self for method chaining.
+    pub fn project(mut slf: PyRefMut<'_, Self>, indices: Vec<usize>) -> 
PyRefMut<'_, Self> {
+        slf.projection = Some(ProjectionType::Indices(indices));
+        slf
+    }
+
+    /// Project to specific columns by their names.
+    ///
+    /// Args:
+    ///     names: List of column names to include in the scan.
+    ///
+    /// Returns:
+    ///     Self for method chaining.
+    pub fn project_by_name(mut slf: PyRefMut<'_, Self>, names: Vec<String>) -> 
PyRefMut<'_, Self> {
+        slf.projection = Some(ProjectionType::Names(names));
+        slf
+    }
+
+    /// Create a record-based log scanner.
+    ///
+    /// Use this scanner with `poll()` to get individual records with metadata
+    /// (offset, timestamp, change_type).
+    ///
+    /// Returns:
+    ///     LogScanner for record-by-record scanning with `poll()`
+    pub fn create_log_scanner<'py>(&self, py: Python<'py>) -> 
PyResult<Bound<'py, PyAny>> {
+        self.create_scanner_internal(py, ScannerType::Record)
+    }
+
+    /// Create a batch-based log scanner.
+    ///
+    /// Use this scanner with `poll_arrow()` to get Arrow Tables, or with
+    /// `poll_batches()` to get individual batches with metadata.
+    ///
+    /// Returns:
+    ///     LogScanner for batch-based scanning with `poll_arrow()` or 
`poll_batches()`
+    pub fn create_batch_scanner<'py>(&self, py: Python<'py>) -> 
PyResult<Bound<'py, PyAny>> {
+        self.create_scanner_internal(py, ScannerType::Batch)
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "TableScan(table={}.{})",
+            self.table_info.table_path.database(),
+            self.table_info.table_path.table()
+        )
+    }
+}
+
+impl TableScan {
+    fn create_scanner_internal<'py>(
+        &self,
+        py: Python<'py>,
+        scanner_type: ScannerType,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+        let projection = self.projection.clone();
+
+        future_into_py(py, async move {
+            let fluss_table = fcore::client::FlussTable::new(&conn, metadata, 
table_info.clone());
+
+            let projection_indices = resolve_projection_indices(&projection, 
&table_info)?;
+            let table_scan = apply_projection(fluss_table.new_scan(), 
projection)?;
+
+            let admin = conn
+                .get_admin()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            let (projected_schema, projected_row_type) =
+                calculate_projected_types(&table_info, projection_indices)?;
+
+            let py_scanner = match scanner_type {
+                ScannerType::Record => {
+                    let rust_scanner = 
table_scan.create_log_scanner().map_err(|e| {
+                        FlussError::new_err(format!("Failed to create log 
scanner: {e}"))
+                    })?;
+                    LogScanner::from_log_scanner(
+                        rust_scanner,
+                        admin,
+                        table_info,
+                        projected_schema,
+                        projected_row_type,
+                    )
+                }
+                ScannerType::Batch => {
+                    let rust_scanner =
+                        
table_scan.create_record_batch_log_scanner().map_err(|e| {
+                            FlussError::new_err(format!("Failed to create 
batch scanner: {e}"))
+                        })?;
+                    LogScanner::from_batch_scanner(
+                        rust_scanner,
+                        admin,
+                        table_info,
+                        projected_schema,
+                        projected_row_type,
+                    )
+                }
+            };
+
+            Python::attach(|py| Py::new(py, py_scanner))
+        })
+    }
+}
+
 /// Internal enum to represent different projection types
+#[derive(Clone)]
 enum ProjectionType {
     Indices(Vec<usize>),
     Names(Vec<String>),
 }
 
+/// Resolve projection to column indices
+fn resolve_projection_indices(
+    projection: &Option<ProjectionType>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<Option<Vec<usize>>> {
+    match projection {
+        Some(ProjectionType::Indices(indices)) => Ok(Some(indices.clone())),
+        Some(ProjectionType::Names(names)) => {
+            let schema = table_info.get_schema();
+            let columns = schema.columns();
+            let mut indices = Vec::with_capacity(names.len());
+            for name in names {
+                let idx = columns
+                    .iter()
+                    .position(|c| c.name() == name)
+                    .ok_or_else(|| FlussError::new_err(format!("Column '{}' 
not found", name)))?;
+                indices.push(idx);
+            }
+            Ok(Some(indices))
+        }
+        None => Ok(None),
+    }
+}
+
+/// Apply projection to table scan
+fn apply_projection(
+    table_scan: fcore::client::TableScan,
+    projection: Option<ProjectionType>,
+) -> PyResult<fcore::client::TableScan> {
+    match projection {
+        Some(ProjectionType::Indices(indices)) => table_scan
+            .project(&indices)
+            .map_err(|e| FlussError::new_err(format!("Failed to project 
columns: {e}"))),
+        Some(ProjectionType::Names(names)) => {
+            let column_name_refs: Vec<&str> = names.iter().map(|s| 
s.as_str()).collect();
+            table_scan
+                .project_by_name(&column_name_refs)
+                .map_err(|e| FlussError::new_err(format!("Failed to project 
columns: {e}")))
+        }
+        None => Ok(table_scan),
+    }
+}
+
+/// Calculate projected schema and row type from projection indices
+fn calculate_projected_types(
+    table_info: &fcore::metadata::TableInfo,
+    projection_indices: Option<Vec<usize>>,
+) -> PyResult<(SchemaRef, fcore::metadata::RowType)> {
+    let full_schema = to_arrow_schema(table_info.get_row_type())
+        .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: 
{e}")))?;
+    let full_row_type = table_info.get_row_type();
+
+    match projection_indices {
+        Some(indices) => {
+            let arrow_fields: Vec<_> = indices
+                .iter()
+                .map(|&i| full_schema.field(i).clone())
+                .collect();
+            let row_fields: Vec<_> = indices
+                .iter()
+                .map(|&i| full_row_type.fields()[i].clone())
+                .collect();
+            Ok((
+                Arc::new(arrow_schema::Schema::new(arrow_fields)),
+                fcore::metadata::RowType::new(row_fields),
+            ))
+        }
+        None => Ok((full_schema, full_row_type.clone())),
+    }
+}
+
 #[pymethods]
 impl FlussTable {
+    /// Create a new table scan builder for configuring and creating log 
scanners.
+    ///
+    /// Use this method to create scanners with the builder pattern:
+    /// Returns:
+    ///     TableScan builder for configuring the scanner.
+    pub fn new_scan(&self) -> TableScan {
+        TableScan {
+            connection: self.connection.clone(),
+            metadata: self.metadata.clone(),
+            table_info: self.table_info.clone(),
+            projection: None,
+        }
+    }
+
     /// Create a new append writer for the table
     fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
         let conn = self.connection.clone();
@@ -79,41 +416,6 @@ impl FlussTable {
         })
     }
 
-    /// Create a new log scanner for the table.
-    ///
-    /// Args:
-    ///     project: Optional list of column indices (0-based) to include in 
the scan.
-    ///     columns: Optional list of column names to include in the scan.
-    ///
-    /// Returns:
-    ///     LogScanner, optionally with projection applied
-    ///
-    /// Note:
-    ///     Specify only one of 'project' or 'columns'.
-    ///     If neither is specified, all columns are included.
-    ///     Rust side will validate the projection parameters.
-    ///
-    #[pyo3(signature = (project=None, columns=None))]
-    pub fn new_log_scanner<'py>(
-        &self,
-        py: Python<'py>,
-        project: Option<Vec<usize>>,
-        columns: Option<Vec<String>>,
-    ) -> PyResult<Bound<'py, PyAny>> {
-        let projection = match (project, columns) {
-            (Some(_), Some(_)) => {
-                return Err(FlussError::new_err(
-                    "Specify only one of 'project' or 'columns'".to_string(),
-                ));
-            }
-            (Some(indices), None) => Some(ProjectionType::Indices(indices)),
-            (None, Some(names)) => Some(ProjectionType::Names(names)),
-            (None, None) => None,
-        };
-
-        self.create_log_scanner_internal(py, projection)
-    }
-
     /// Get table information
     pub fn get_table_info(&self) -> TableInfo {
         TableInfo::from_core(self.table_info.clone())
@@ -219,55 +521,6 @@ impl FlussTable {
             has_primary_key,
         }
     }
-
-    /// Internal helper to create log scanner with optional projection
-    fn create_log_scanner_internal<'py>(
-        &self,
-        py: Python<'py>,
-        projection: Option<ProjectionType>,
-    ) -> PyResult<Bound<'py, PyAny>> {
-        let conn = self.connection.clone();
-        let metadata = self.metadata.clone();
-        let table_info = self.table_info.clone();
-
-        future_into_py(py, async move {
-            let fluss_table =
-                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
-
-            let mut table_scan = fluss_table.new_scan();
-
-            // Apply projection if specified
-            if let Some(proj) = projection {
-                table_scan = match proj {
-                    ProjectionType::Indices(indices) => {
-                        table_scan.project(&indices).map_err(|e| {
-                            FlussError::new_err(format!("Failed to project 
columns: {e}"))
-                        })?
-                    }
-                    ProjectionType::Names(names) => {
-                        // Convert Vec<String> to Vec<&str> for the API
-                        let column_name_refs: Vec<&str> =
-                            names.iter().map(|s| s.as_str()).collect();
-                        
table_scan.project_by_name(&column_name_refs).map_err(|e| {
-                            FlussError::new_err(format!("Failed to project 
columns: {e}"))
-                        })?
-                    }
-                };
-            }
-
-            let rust_scanner = table_scan
-                .create_record_batch_log_scanner()
-                .map_err(|e| FlussError::new_err(format!("Failed to create log 
scanner: {e}")))?;
-
-            let admin = conn
-                .get_admin()
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
-
-            let py_scanner = LogScanner::from_core(rust_scanner, admin, 
table_info.clone());
-            Python::attach(|py| Py::new(py, py_scanner))
-        })
-    }
 }
 
 /// Writer for appending data to a Fluss table
@@ -295,7 +548,7 @@ impl AppendWriter {
     pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> 
PyResult<()> {
         // This shares the underlying Arrow buffers without copying data
         let batch_bound = batch.bind(py);
-        let rust_batch: RecordBatch = 
FromPyArrow::from_pyarrow_bound(batch_bound)
+        let rust_batch: ArrowRecordBatch = 
FromPyArrow::from_pyarrow_bound(batch_bound)
             .map_err(|e| FlussError::new_err(format!("Failed to convert 
RecordBatch: {e}")))?;
 
         let inner = self.inner.clone();
@@ -1303,12 +1556,23 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
         .unwrap_or_else(|_| "unknown".to_string())
 }
 
-/// Scanner for reading log data from a Fluss table
+/// Scanner for reading log data from a Fluss table.
+///
+/// This scanner supports two modes:
+/// - Record-based scanning via `poll()` - returns individual records with 
metadata
+/// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns 
Arrow batches
 #[pyclass]
 pub struct LogScanner {
-    inner: fcore::client::RecordBatchLogScanner,
+    /// Record-based scanner for poll()
+    inner: Option<fcore::client::LogScanner>,
+    /// Batch-based scanner for poll_arrow/poll_batches
+    inner_batch: Option<fcore::client::RecordBatchLogScanner>,
     admin: fcore::client::FlussAdmin,
     table_info: fcore::metadata::TableInfo,
+    /// The projected Arrow schema to use for empty table creation
+    projected_schema: SchemaRef,
+    /// The projected row type to use for record-based scanning
+    projected_row_type: fcore::metadata::RowType,
     #[allow(dead_code)]
     start_timestamp: Option<i64>,
     #[allow(dead_code)]
@@ -1338,19 +1602,40 @@ impl LogScanner {
         for bucket_id in 0..num_buckets {
             let start_offset = EARLIEST_OFFSET;
 
-            TOKIO_RUNTIME.block_on(async {
-                self.inner
-                    .subscribe(bucket_id, start_offset)
-                    .await
-                    .map_err(|e| FlussError::new_err(e.to_string()))
-            })?;
+            // Subscribe to the appropriate scanner
+            if let Some(ref inner) = self.inner {
+                TOKIO_RUNTIME.block_on(async {
+                    inner
+                        .subscribe(bucket_id, start_offset)
+                        .await
+                        .map_err(|e| FlussError::new_err(e.to_string()))
+                })?;
+            } else if let Some(ref inner_batch) = self.inner_batch {
+                TOKIO_RUNTIME.block_on(async {
+                    inner_batch
+                        .subscribe(bucket_id, start_offset)
+                        .await
+                        .map_err(|e| FlussError::new_err(e.to_string()))
+                })?;
+            } else {
+                return Err(FlussError::new_err("No scanner available"));
+            }
         }
 
         Ok(())
     }
 
     /// Convert all data to Arrow Table
+    ///
+    /// Note: Requires a batch-based scanner (created with 
new_scan().create_batch_scanner()).
     fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
+        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+            FlussError::new_err(
+                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
+                 that supports to_arrow().",
+            )
+        })?;
+
         let mut all_batches = Vec::new();
 
         let num_buckets = self.table_info.get_num_buckets();
@@ -1378,7 +1663,7 @@ impl LogScanner {
             let scan_batches = py
                 .detach(|| {
                     TOKIO_RUNTIME
-                        .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await })
+                        .block_on(async { 
inner_batch.poll(Duration::from_millis(500)).await })
                 })
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
 
@@ -1439,18 +1724,114 @@ impl LogScanner {
         Ok(df)
     }
 
-    /// Poll for new records with the specified timeout
+    /// Poll for individual records with metadata.
     ///
     /// Args:
     ///     timeout_ms: Timeout in milliseconds to wait for records
     ///
     /// Returns:
-    ///     PyArrow Table containing the polled records
+    ///     List of ScanRecord objects, each containing bucket, offset, 
timestamp,
+    ///     change_type, and row data as a dictionary.
+    ///
+    /// Note:
+    ///     - Requires a record-based scanner (created with 
new_scan().create_log_scanner())
+    ///     - Returns an empty list if no records are available
+    ///     - When timeout expires, returns an empty list (NOT an error)
+    fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
+        let inner = self.inner.as_ref().ok_or_else(|| {
+            FlussError::new_err(
+                "Record-based scanner not available. Use 
new_scan().create_log_scanner() to create a scanner \
+                 that supports poll().",
+            )
+        })?;
+
+        if timeout_ms < 0 {
+            return Err(FlussError::new_err(format!(
+                "timeout_ms must be non-negative, got: {timeout_ms}"
+            )));
+        }
+
+        let timeout = Duration::from_millis(timeout_ms as u64);
+        let scan_records = py
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
inner.poll(timeout).await }))
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        // Convert ScanRecords to Python ScanRecord list
+        // Use projected_row_type to handle column projection correctly
+        let row_type = &self.projected_row_type;
+        let mut result = Vec::new();
+
+        for (bucket, records) in scan_records.into_records_by_buckets() {
+            for record in records {
+                let scan_record = ScanRecord::from_core(py, &bucket, &record, 
row_type)?;
+                result.push(scan_record);
+            }
+        }
+
+        Ok(result)
+    }
+
+    /// Poll for batches with metadata.
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds to wait for batches
+    ///
+    /// Returns:
+    ///     List of RecordBatch objects, each containing the Arrow batch along 
with
+    ///     bucket, base_offset, and last_offset metadata.
     ///
     /// Note:
+    ///     - Requires a batch-based scanner (created with 
new_scan().create_batch_scanner())
+    ///     - Returns an empty list if no batches are available
+    ///     - When timeout expires, returns an empty list (NOT an error)
+    fn poll_batches(&self, py: Python, timeout_ms: i64) -> 
PyResult<Vec<RecordBatch>> {
+        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+            FlussError::new_err(
+                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
+                 that supports poll_batches().",
+            )
+        })?;
+
+        if timeout_ms < 0 {
+            return Err(FlussError::new_err(format!(
+                "timeout_ms must be non-negative, got: {timeout_ms}"
+            )));
+        }
+
+        let timeout = Duration::from_millis(timeout_ms as u64);
+        let scan_batches = py
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
inner_batch.poll(timeout).await }))
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        // Convert ScanBatch to RecordBatch with metadata
+        let result = scan_batches
+            .into_iter()
+            .map(RecordBatch::from_scan_batch)
+            .collect();
+
+        Ok(result)
+    }
+
+    /// Poll for new records as an Arrow Table.
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds to wait for records
+    ///
+    /// Returns:
+    ///     PyArrow Table containing the polled records (batches merged)
+    ///
+    /// Note:
+    ///     - Requires a batch-based scanner (created with 
new_scan().create_batch_scanner())
     ///     - Returns an empty table (with correct schema) if no records are 
available
     ///     - When timeout expires, returns an empty table (NOT an error)
-    fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
+    fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
+        let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
+            FlussError::new_err(
+                "Batch-based scanner not available. Use 
new_scan().create_batch_scanner() to create a scanner \
+                 that supports poll_arrow().",
+            )
+        })?;
+
         if timeout_ms < 0 {
             return Err(FlussError::new_err(format!(
                 "timeout_ms must be non-negative, got: {timeout_ms}"
@@ -1459,7 +1840,7 @@ impl LogScanner {
 
         let timeout = Duration::from_millis(timeout_ms as u64);
         let scan_batches = py
-            .detach(|| TOKIO_RUNTIME.block_on(async { 
self.inner.poll(timeout).await }))
+            .detach(|| TOKIO_RUNTIME.block_on(async { 
inner_batch.poll(timeout).await }))
             .map_err(|e| FlussError::new_err(e.to_string()))?;
 
         // Convert ScanBatch to Arrow batches
@@ -1475,11 +1856,11 @@ impl LogScanner {
         Utils::combine_batches_to_table(py, arrow_batches)
     }
 
-    /// Create an empty PyArrow table with the correct schema
+    /// Create an empty PyArrow table with the correct (projected) schema
     fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> {
-        let arrow_schema = to_arrow_schema(self.table_info.get_row_type())
-            .map_err(|e| FlussError::new_err(format!("Failed to get arrow 
schema: {e}")))?;
-        let py_schema = arrow_schema
+        // Use the projected schema stored in the scanner
+        let py_schema = self
+            .projected_schema
             .as_ref()
             .to_pyarrow(py)
             .map_err(|e| FlussError::new_err(format!("Failed to convert 
schema: {e}")))?;
@@ -1498,16 +1879,41 @@ impl LogScanner {
 }
 
 impl LogScanner {
-    /// Create LogScanner from core RecordBatchLogScanner
-    pub fn from_core(
-        inner_scanner: fcore::client::RecordBatchLogScanner,
+    /// Create LogScanner for record-based scanning
+    pub fn from_log_scanner(
+        inner_scanner: fcore::client::LogScanner,
+        admin: fcore::client::FlussAdmin,
+        table_info: fcore::metadata::TableInfo,
+        projected_schema: SchemaRef,
+        projected_row_type: fcore::metadata::RowType,
+    ) -> Self {
+        Self {
+            inner: Some(inner_scanner),
+            inner_batch: None,
+            admin,
+            table_info,
+            projected_schema,
+            projected_row_type,
+            start_timestamp: None,
+            end_timestamp: None,
+        }
+    }
+
+    /// Create LogScanner for batch-based scanning
+    pub fn from_batch_scanner(
+        inner_batch_scanner: fcore::client::RecordBatchLogScanner,
         admin: fcore::client::FlussAdmin,
         table_info: fcore::metadata::TableInfo,
+        projected_schema: SchemaRef,
+        projected_row_type: fcore::metadata::RowType,
     ) -> Self {
         Self {
-            inner: inner_scanner,
+            inner: None,
+            inner_batch: Some(inner_batch_scanner),
             admin,
             table_info,
+            projected_schema,
+            projected_row_type,
             start_timestamp: None,
             end_timestamp: None,
         }
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index ee32c9c..c92f1b9 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -203,20 +203,15 @@ impl Utils {
         py: Python,
         batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
     ) -> PyResult<Py<PyAny>> {
-        use arrow_array::RecordBatch as ArrowArrayRecordBatch;
-
         let py_batches: Result<Vec<Py<PyAny>>, _> = batches
             .iter()
             .map(|batch| {
-                ArrowArrayRecordBatch::try_new(batch.schema().clone(), 
batch.columns().to_vec())
-                    .map_err(|e| FlussError::new_err(format!("Failed to 
convert RecordBatch: {e}")))
-                    .and_then(|b| {
-                        ToPyArrow::to_pyarrow(&b, py)
-                            .map(|x| x.into())
-                            .map_err(|e| {
-                                FlussError::new_err(format!("Failed to convert 
to PyObject: {e}"))
-                            })
-                    })
+                // Just dereference the Arc - no need to recreate the batch
+                batch
+                    .as_ref()
+                    .to_pyarrow(py)
+                    .map(|x| x.into())
+                    .map_err(|e| FlussError::new_err(format!("Failed to 
convert to PyObject: {e}")))
             })
             .collect();
 


Reply via email to