This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d203cf9 feat: add column projection support to Python LogScanner
(#151)
d203cf9 is described below
commit d203cf9f1e7a69113a23234951c8054a78d7a9e9
Author: Anton Borisov <[email protected]>
AuthorDate: Fri Jan 16 02:27:04 2026 +0000
feat: add column projection support to Python LogScanner (#151)
---
bindings/python/example/example.py | 22 ++++++++
bindings/python/src/table.rs | 112 ++++++++++++++++++++++++++++---------
2 files changed, 109 insertions(+), 25 deletions(-)
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 0523f94..0b1e67d 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -178,6 +178,28 @@ async def main():
except Exception as e:
print(f"Error during scanning: {e}")
+ # Demo: Column projection
+ print("\n--- Testing Column Projection ---")
+ try:
+ # Project specific columns by index
+ print("\n1. Projection by index [0, 1] (id, name):")
+ scanner_index = await table.new_log_scanner(project=[0, 1])
+ scanner_index.subscribe(None, None)
+ df_projected = scanner_index.to_pandas()
+ print(df_projected.head())
+ print(f" Projected {df_projected.shape[1]} columns:
{list(df_projected.columns)}")
+
+ # Project specific columns by name (Pythonic!)
+ print("\n2. Projection by name ['name', 'score'] (Pythonic):")
+ scanner_names = await table.new_log_scanner(columns=["name", "score"])
+ scanner_names.subscribe(None, None)
+ df_named = scanner_names.to_pandas()
+ print(df_named.head())
+ print(f" Projected {df_named.shape[1]} columns:
{list(df_named.columns)}")
+
+ except Exception as e:
+ print(f"Error during projection: {e}")
+
# Close connection
conn.close()
print("\nConnection closed")
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8a11648..6cd13c4 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -34,6 +34,12 @@ pub struct FlussTable {
has_primary_key: bool,
}
+/// Internal enum to represent different projection types
+enum ProjectionType {
+ Indices(Vec<usize>),
+ Names(Vec<String>),
+}
+
#[pymethods]
impl FlussTable {
/// Create a new append writer for the table
@@ -57,32 +63,39 @@ impl FlussTable {
})
}
- /// Create a new log scanner for the table
- fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
- let conn = self.connection.clone();
- let metadata = self.metadata.clone();
- let table_info = self.table_info.clone();
-
- future_into_py(py, async move {
- let fluss_table =
- fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
-
- let table_scan = fluss_table.new_scan();
-
- let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
- PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
- "Failed to create log scanner: {e:?}"
- ))
- })?;
-
- let admin = conn
- .get_admin()
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
+ /// Create a new log scanner for the table.
+ ///
+ /// Args:
+ /// project: Optional list of column indices (0-based) to include in
the scan.
+ /// columns: Optional list of column names to include in the scan.
+ ///
+ /// Returns:
+ /// LogScanner, optionally with projection applied
+ ///
+ /// Note:
+ /// Specify only one of 'project' or 'columns'.
+ /// If neither is specified, all columns are included.
+ /// Rust side will validate the projection parameters.
+ ///
+ #[pyo3(signature = (project=None, columns=None))]
+ pub fn new_log_scanner<'py>(
+ &self,
+ py: Python<'py>,
+ project: Option<Vec<usize>>,
+ columns: Option<Vec<String>>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let projection = match (project, columns) {
+ (Some(_), Some(_)) => {
+ return Err(FlussError::new_err(
+ "Specify only one of 'project' or 'columns'".to_string(),
+ ));
+ }
+ (Some(indices), None) => Some(ProjectionType::Indices(indices)),
+ (None, Some(names)) => Some(ProjectionType::Names(names)),
+ (None, None) => None,
+ };
- let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
- Python::attach(|py| Py::new(py, py_scanner))
- })
+ self.create_log_scanner_internal(py, projection)
}
/// Get table information
@@ -126,6 +139,55 @@ impl FlussTable {
has_primary_key,
}
}
+
+ /// Internal helper to create log scanner with optional projection
+ fn create_log_scanner_internal<'py>(
+ &self,
+ py: Python<'py>,
+ projection: Option<ProjectionType>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let mut table_scan = fluss_table.new_scan();
+
+ // Apply projection if specified
+ if let Some(proj) = projection {
+ table_scan = match proj {
+ ProjectionType::Indices(indices) => {
+ table_scan.project(&indices).map_err(|e| {
+ FlussError::new_err(format!("Failed to project
columns: {e}"))
+ })?
+ }
+ ProjectionType::Names(names) => {
+ // Convert Vec<String> to Vec<&str> for the API
+ let column_name_refs: Vec<&str> =
+ names.iter().map(|s| s.as_str()).collect();
+
table_scan.project_by_name(&column_name_refs).map_err(|e| {
+ FlussError::new_err(format!("Failed to project
columns: {e}"))
+ })?
+ }
+ };
+ }
+
+ let rust_scanner = table_scan
+ .create_log_scanner()
+ .map_err(|e| FlussError::new_err(format!("Failed to create log
scanner: {e}")))?;
+
+ let admin = conn
+ .get_admin()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
+ Python::attach(|py| Py::new(py, py_scanner))
+ })
+ }
}
/// Writer for appending data to a Fluss table