Copilot commented on code in PR #151:
URL: https://github.com/apache/fluss-rust/pull/151#discussion_r2694732646
##########
bindings/python/src/table.rs:
##########
@@ -70,9 +70,112 @@ impl FlussTable {
let table_scan = fluss_table.new_scan();
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
- PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
- "Failed to create log scanner: {e:?}"
- ))
+ FlussError::new_err(format!("Failed to create log scanner:
{e}"))
+ })?;
+
+ let admin = conn
+ .get_admin()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
+ Python::attach(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// Create a new log scanner with column projection (by index).
+ ///
+ /// Args:
+ /// column_indices: List of column indices to include in the scan
(0-based)
+ ///
+ /// Returns:
+ /// LogScanner with projection applied
+ ///
+ /// Example:
+ /// >>> scanner = await table.new_log_scanner_with_projection([0, 2,
4])
+ pub fn new_log_scanner_with_projection<'py>(
+ &self,
+ py: Python<'py>,
+ column_indices: Vec<usize>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ // Validate early with Python-friendly error
+ if column_indices.is_empty() {
+ return Err(FlussError::new_err(
+ "column_indices cannot be empty".to_string(),
+ ));
+ }
+
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ let table_scan = table_scan
+ .project(&column_indices)
+ .map_err(|e| FlussError::new_err(format!("Failed to project
columns: {e}")))?;
+
+ let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
+ FlussError::new_err(format!("Failed to create log scanner:
{e}"))
+ })?;
+
+ let admin = conn
+ .get_admin()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
+ Python::attach(|py| Py::new(py, py_scanner))
+ })
Review Comment:
There is significant code duplication between `new_log_scanner`,
`new_log_scanner_with_projection`, and `new_log_scanner_with_column_names`. The
three methods share nearly identical boilerplate code for creating the
FlussTable, getting the admin, and constructing the LogScanner. Consider
extracting a private helper function that accepts an optional projection
configuration to reduce duplication and improve maintainability.
```suggestion
Self::create_log_scanner_internal(conn, metadata, table_info,
Some(column_indices))
.await
})
}
/// Internal helper to create a log scanner with an optional column
projection.
///
/// This centralizes the boilerplate for constructing the FlussTable,
creating
/// the scan, applying an optional projection, and wrapping the resulting
/// scanner for Python.
async fn create_log_scanner_internal(
conn: Arc<fcore::client::FlussConnection>,
metadata: Arc<fcore::client::Metadata>,
table_info: fcore::metadata::TableInfo,
column_indices: Option<Vec<usize>>,
) -> PyResult<Py<PyAny>> {
let fluss_table =
fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
let table_scan = fluss_table.new_scan();
let table_scan = if let Some(indices) = column_indices {
table_scan
.project(&indices)
.map_err(|e| FlussError::new_err(format!("Failed to project
columns: {e}")))?
} else {
table_scan
};
let rust_scanner = table_scan
.create_log_scanner()
.map_err(|e| FlussError::new_err(format!("Failed to create log
scanner: {e}")))?;
let admin = conn
.get_admin()
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
Python::attach(|py| Py::new(py, py_scanner))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]