leekeiabstraction commented on code in PR #151:
URL: https://github.com/apache/fluss-rust/pull/151#discussion_r2680176217
##########
bindings/python/example/example.py:
##########
@@ -178,6 +178,28 @@ async def main():
except Exception as e:
print(f"Error during scanning: {e}")
+ # Demo: Column projection
+ print("\n--- Testing Column Projection ---")
+ try:
+ # Project specific columns by index (C++ parity)
+ print("\n1. Projection by index [0, 1] (id, name):")
+ scanner_index = await table.new_log_scanner_with_projection([0, 1])
+ scanner_index.subscribe(None, None)
+ df_projected = scanner_index.to_pandas()
+ print(df_projected.head())
+ print(f" Projected {df_projected.shape[1]} columns:
{list(df_projected.columns)}")
+
+ # Project specific columns by name (Python-specific, more idiomatic!)
+ print("\n2. Projection by name ['name', 'score'] (Pythonic):")
+ scanner_names = await table.new_log_scanner_with_column_names(["name",
"score"])
+ scanner_names.subscribe(None, None)
+ df_named = scanner_names.to_pandas()
+ print(df_named.head())
+ print(f" Projected {df_named.shape[1]} columns:
{list(df_named.columns)}")
Review Comment:
Should the polling part also be included (as with C++ example)?
##########
bindings/python/src/table.rs:
##########
@@ -85,6 +85,111 @@ impl FlussTable {
})
}
+ /// Create a new log scanner with column projection (by index).
+ ///
+ /// Args:
+ /// column_indices: List of column indices to include in the scan
(0-based)
+ ///
+ /// Returns:
+ /// LogScanner with projection applied
+ ///
+ /// Example:
+ /// >>> scanner = await table.new_log_scanner_with_projection([0, 2,
4])
+ pub fn new_log_scanner_with_projection<'py>(
+ &self,
+ py: Python<'py>,
+ column_indices: Vec<usize>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ // Validate early with Python-friendly error
+ if column_indices.is_empty() {
+ return Err(FlussError::new_err(
+ "column_indices cannot be empty".to_string(),
+ ));
+ }
+
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ let table_scan = table_scan
+ .project(&column_indices)
+ .map_err(|e| FlussError::new_err(format!("Failed to project
columns: {e}")))?;
+
+ let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
+ FlussError::new_err(format!("Failed to create log scanner:
{e}"))
+ })?;
Review Comment:
We seem to use FlussError and PyErr within this file, for example line 72 to
75 uses PyErr. Can you clarify when each should be used?
```
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to create log scanner: {e:?}"
))
})?;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]