This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d203cf9  feat: add column projection support to Python LogScanner 
(#151)
d203cf9 is described below

commit d203cf9f1e7a69113a23234951c8054a78d7a9e9
Author: Anton Borisov <[email protected]>
AuthorDate: Fri Jan 16 02:27:04 2026 +0000

    feat: add column projection support to Python LogScanner (#151)
---
 bindings/python/example/example.py |  22 ++++++++
 bindings/python/src/table.rs       | 112 ++++++++++++++++++++++++++++---------
 2 files changed, 109 insertions(+), 25 deletions(-)

diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 0523f94..0b1e67d 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -178,6 +178,28 @@ async def main():
     except Exception as e:
         print(f"Error during scanning: {e}")
 
+    # Demo: Column projection
+    print("\n--- Testing Column Projection ---")
+    try:
+        # Project specific columns by index
+        print("\n1. Projection by index [0, 1] (id, name):")
+        scanner_index = await table.new_log_scanner(project=[0, 1])
+        scanner_index.subscribe(None, None)
+        df_projected = scanner_index.to_pandas()
+        print(df_projected.head())
+        print(f"   Projected {df_projected.shape[1]} columns: 
{list(df_projected.columns)}")
+
+        # Project specific columns by name (Pythonic!)
+        print("\n2. Projection by name ['name', 'score'] (Pythonic):")
+        scanner_names = await table.new_log_scanner(columns=["name", "score"])
+        scanner_names.subscribe(None, None)
+        df_named = scanner_names.to_pandas()
+        print(df_named.head())
+        print(f"   Projected {df_named.shape[1]} columns: 
{list(df_named.columns)}")
+
+    except Exception as e:
+        print(f"Error during projection: {e}")
+
     # Close connection
     conn.close()
     print("\nConnection closed")
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8a11648..6cd13c4 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -34,6 +34,12 @@ pub struct FlussTable {
     has_primary_key: bool,
 }
 
+/// Internal enum to represent different projection types
+enum ProjectionType {
+    Indices(Vec<usize>),
+    Names(Vec<String>),
+}
+
 #[pymethods]
 impl FlussTable {
     /// Create a new append writer for the table
@@ -57,32 +63,39 @@ impl FlussTable {
         })
     }
 
-    /// Create a new log scanner for the table
-    fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
-        let conn = self.connection.clone();
-        let metadata = self.metadata.clone();
-        let table_info = self.table_info.clone();
-
-        future_into_py(py, async move {
-            let fluss_table =
-                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
-
-            let table_scan = fluss_table.new_scan();
-
-            let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
-                PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
-                    "Failed to create log scanner: {e:?}"
-                ))
-            })?;
-
-            let admin = conn
-                .get_admin()
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
+    /// Create a new log scanner for the table.
+    ///
+    /// Args:
+    ///     project: Optional list of column indices (0-based) to include in 
the scan.
+    ///     columns: Optional list of column names to include in the scan.
+    ///
+    /// Returns:
+    ///     LogScanner, optionally with projection applied
+    ///
+    /// Note:
+    ///     Specify only one of 'project' or 'columns'.
+    ///     If neither is specified, all columns are included.
+    ///     Rust side will validate the projection parameters.
+    ///
+    #[pyo3(signature = (project=None, columns=None))]
+    pub fn new_log_scanner<'py>(
+        &self,
+        py: Python<'py>,
+        project: Option<Vec<usize>>,
+        columns: Option<Vec<String>>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let projection = match (project, columns) {
+            (Some(_), Some(_)) => {
+                return Err(FlussError::new_err(
+                    "Specify only one of 'project' or 'columns'".to_string(),
+                ));
+            }
+            (Some(indices), None) => Some(ProjectionType::Indices(indices)),
+            (None, Some(names)) => Some(ProjectionType::Names(names)),
+            (None, None) => None,
+        };
 
-            let py_scanner = LogScanner::from_core(rust_scanner, admin, 
table_info.clone());
-            Python::attach(|py| Py::new(py, py_scanner))
-        })
+        self.create_log_scanner_internal(py, projection)
     }
 
     /// Get table information
@@ -126,6 +139,55 @@ impl FlussTable {
             has_primary_key,
         }
     }
+
+    /// Internal helper to create log scanner with optional projection
+    fn create_log_scanner_internal<'py>(
+        &self,
+        py: Python<'py>,
+        projection: Option<ProjectionType>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table =
+                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
+
+            let mut table_scan = fluss_table.new_scan();
+
+            // Apply projection if specified
+            if let Some(proj) = projection {
+                table_scan = match proj {
+                    ProjectionType::Indices(indices) => {
+                        table_scan.project(&indices).map_err(|e| {
+                            FlussError::new_err(format!("Failed to project 
columns: {e}"))
+                        })?
+                    }
+                    ProjectionType::Names(names) => {
+                        // Convert Vec<String> to Vec<&str> for the API
+                        let column_name_refs: Vec<&str> =
+                            names.iter().map(|s| s.as_str()).collect();
+                        
table_scan.project_by_name(&column_name_refs).map_err(|e| {
+                            FlussError::new_err(format!("Failed to project 
columns: {e}"))
+                        })?
+                    }
+                };
+            }
+
+            let rust_scanner = table_scan
+                .create_log_scanner()
+                .map_err(|e| FlussError::new_err(format!("Failed to create log 
scanner: {e}")))?;
+
+            let admin = conn
+                .get_admin()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            let py_scanner = LogScanner::from_core(rust_scanner, admin, 
table_info.clone());
+            Python::attach(|py| Py::new(py, py_scanner))
+        })
+    }
 }
 
 /// Writer for appending data to a Fluss table

Reply via email to