fresh-borzoni commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2943512062
##########
bindings/python/src/table.rs:
##########
@@ -2167,13 +2165,16 @@ impl LogScanner {
/// Returns:
/// PyArrow Table containing all data from subscribed buckets
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
- let scanner = self.scanner.as_batch()?;
- let subscribed = scanner.get_subscribed_buckets();
- if subscribed.is_empty() {
- return Err(FlussError::new_err(
- "No buckets subscribed. Call subscribe(), subscribe_buckets(),
subscribe_partition(), or subscribe_partition_buckets() first.",
- ));
- }
+ let subscribed = {
+ let scanner = self.kind.as_batch()?;
+ let subs = scanner.get_subscribed_buckets();
+ if subs.is_empty() {
+ return Err(FlussError::new_err(
+ "No buckets subscribed. Call subscribe(),
subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets()
first.",
+ ));
+ }
+ subs.clone()
Review Comment:
nit: scoping block + subs.clone() was needed with the Mutex, not needed with
Arc - all borrows are shared now
##########
bindings/python/src/table.rs:
##########
@@ -2199,6 +2200,171 @@ impl LogScanner {
Ok(df)
}
+ fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
Review Comment:
I think it's better to leave `_async_poll` and `_async_poll_batches` out of
`.pyi` bc these methods ideally should be private implementation details.
So exposing `__aiter__` makes sense to just signal IDE that we support
`async for`, but the rest of underscore methods added - we don't want to
encourage users to use them directly
##########
bindings/python/src/table.rs:
##########
@@ -2199,6 +2200,171 @@ impl LogScanner {
Ok(df)
}
+ fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+ let py = slf.py();
+
+ match slf.kind.as_ref() {
+ ScannerKind::Record(_) => {
+ static RECORD_ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> =
PyOnceLock::new();
+ let gen_fn = RECORD_ASYNC_GEN_FN.get_or_init(py, || {
+ let code = pyo3::ffi::c_str!(
+ r#"
+async def _async_scan(scanner, timeout_ms=1000):
+ while True:
+ batch = await scanner._async_poll(timeout_ms)
+ if batch:
+ for record in batch:
+ yield record
+"#
+ );
+ let globals = pyo3::types::PyDict::new(py);
+ py.run(code, Some(&globals), None).unwrap();
+ globals.get_item("_async_scan").unwrap().unwrap().unbind()
+ });
+ gen_fn.bind(py).call1((slf.into_bound_py_any(py)?,))
+ }
+ ScannerKind::Batch(_) => {
+ static BATCH_ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> =
PyOnceLock::new();
+ let gen_fn = BATCH_ASYNC_GEN_FN.get_or_init(py, || {
+ let code = pyo3::ffi::c_str!(
+ r#"
+async def _async_batch_scan(scanner, timeout_ms=1000):
Review Comment:
The two `__aiter__` branches are identical except for the poll method name.
You can collapse to a single `PyOnceLock` + generator that takes a callable,
and dispatch by passing `_async_poll` or `_async_poll_batches`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]