Copilot commented on code in PR #487:
URL: https://github.com/apache/fluss-rust/pull/487#discussion_r3067375865


##########
bindings/python/example/example.py:
##########
@@ -40,9 +40,8 @@ async def main():
     config = fluss.Config(config_spec)
 
     # Create connection using the static create method
-    conn = await fluss.FlussConnection.create(config)
-
-    # Define fields for PyArrow
+    async with await fluss.FlussConnection.create(config) as conn:
+        # Define fields for PyArrow
     fields = [
         pa.field("id", pa.int32()),

Review Comment:
   The `async with await FlussConnection.create(...) as conn:` block has no 
executable statement in its body (only a comment), and `fields = [...]` is 
dedented. This will raise a SyntaxError (“expected an indented block”) and also 
ends the connection context immediately. Please indent the subsequent setup 
logic under the `async with` (or add a real statement inside the block) so the 
connection remains open for the rest of `main()`.



##########
bindings/python/src/connection.rs:
##########
@@ -104,6 +104,28 @@ impl FlussConnection {
         Ok(false)
     }
 
+    // Enter the async runtime context (for 'async with' statement)
+    fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> 
PyResult<Bound<'py, PyAny>> {
+        let py_slf = slf.into_pyobject(py)?.unbind();
+        future_into_py(py, async move { Ok(py_slf) })
+    }
+
+    // Exit the async runtime context (for 'async with' statement)
+    #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
+    fn __aexit__<'py>(
+        &mut self,
+        py: Python<'py>,
+        _exc_type: Option<Bound<'py, PyAny>>,
+        _exc_value: Option<Bound<'py, PyAny>>,
+        _traceback: Option<Bound<'py, PyAny>>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        future_into_py(py, async move {
+            // In the future, we could call an async close on the core 
connection here
+            // e.g., client.close().await;
+            Ok(false)
+        })
+    }

Review Comment:
   `__exit__` calls `self.close()?`, but the new async context manager 
`__aexit__` does not. This means `async with await FlussConnection.create(...)` 
will not close the connection (even if `close()` is implemented later). Please 
mirror `__exit__` by calling `self.close()?` in `__aexit__` (it can be done 
before creating the future since `close()` is synchronous).



##########
bindings/python/src/upsert.rs:
##########
@@ -97,6 +97,7 @@ impl UpsertWriter {
     ///
     /// Returns:
     ///     None on success
+    /// Flush any pending data

Review Comment:
   There’s an extra doc line `/// Flush any pending data` inserted after the 
`Returns:` section for `flush()`. It’s redundant with the existing docstring 
and makes the generated docs read oddly. Please remove this stray line or 
integrate it into the main flush doc comment above.
   ```suggestion
   
   ```



##########
bindings/python/test/test_context_manager.py:
##########
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import pytest
+import pyarrow as pa
+import time
+import fluss
+
+def _poll_records(scanner, expected_count, timeout_s=10):
+    """Poll a record-based scanner until expected_count records are 
collected."""
+    collected = []
+    deadline = time.monotonic() + timeout_s
+    while len(collected) < expected_count and time.monotonic() < deadline:
+        records = scanner.poll(5000)
+        collected.extend(records)
+    return collected
+
[email protected]
+async def test_connection_context_manager(plaintext_bootstrap_servers):
+    config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
+    async with await fluss.FlussConnection.create(config) as conn:
+        admin = conn.get_admin()
+        nodes = await admin.get_server_nodes()
+        assert len(nodes) > 0
+    # conn should be closed (though currently close is a no-op in python side, 
but verifies syntax)
+
[email protected]
+async def test_append_writer_success_flush(connection, admin):
+    table_path = fluss.TablePath("fluss", "test_append_ctx_success")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    
+    schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+    
+    table = await connection.get_table(table_path)
+    
+    async with table.new_append().create_writer() as writer:
+        writer.append({"a": 1})
+        writer.append({"a": 2})
+        # No explicit flush here
+        
+    # After context exit, data should be flushed
+    scanner = await table.new_scan().create_log_scanner()
+    scanner.subscribe(0, fluss.EARLIEST_OFFSET)
+    records = _poll_records(scanner, expected_count=2)
+    assert len(records) == 2
+    assert sorted([r.row["a"] for r in records]) == [1, 2]
+
[email protected]
+async def test_append_writer_exception_no_flush(connection, admin):
+    table_path = fluss.TablePath("fluss", "test_append_ctx_fail")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    
+    schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+    table = await connection.get_table(table_path)
+    
+    class TestException(Exception): pass
+    
+    start_time = time.perf_counter()
+    try:
+        async with table.new_append().create_writer() as writer:
+            writer.append({"a": 100})
+            raise TestException("abort")
+    except TestException:
+        pass
+    duration = time.perf_counter() - start_time
+    
+    # Verification:
+    # 1. The exception was propagated immediately.
+    # 2. The block exited nearly instantly because it bypassed the network 
flush.
+    assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), 
likely performed a flush"
+

Review Comment:
   The test asserts the whole `async with` block completes in <0.1s. On 
slower/loaded CI machines this timing assertion can be flaky even when 
`flush()` is correctly skipped (context manager overhead + scheduling jitter). 
Consider using a more tolerant threshold, or assert behavior via 
mocking/observability (e.g., verifying `flush()` was not awaited / no records 
are guaranteed to be acknowledged) rather than wall-clock timing.



##########
bindings/python/test/test_log_table.py:
##########
@@ -143,6 +143,7 @@ async def test_list_offsets(connection, admin):
     assert latest[0] == 0
 
     before_append_ms = int(time.time() * 1000)
+    await asyncio.sleep(0.1)

Review Comment:
   Using a fixed `await asyncio.sleep(0.1)` to separate `before_append_ms` from 
the subsequent writes adds an arbitrary delay and can still be fragile under 
clock granularity/skew. A more deterministic approach is to wait until the 
millisecond clock advances (e.g., loop until `int(time.time()*1000) > 
before_append_ms`) before appending, which avoids hard-coding a 100ms sleep.
   ```suggestion
       while int(time.time() * 1000) <= before_append_ms:
           await asyncio.sleep(0)
   ```



##########
bindings/python/src/table.rs:
##########
@@ -2331,6 +2362,32 @@ async def _async_scan_generic(scanner, method_name):
     fn __repr__(&self) -> String {
         format!("LogScanner(table={})", self.table_info.table_path)
     }
+
+    /// Close the scanner
+    pub fn close(&self) -> PyResult<()> {
+        Ok(())
+    }
+
+    // Enter the async runtime context (for 'async with' statement)
+    fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> 
PyResult<Bound<'py, PyAny>> {
+        let py_slf = slf.into_pyobject(py)?.unbind();
+        future_into_py(py, async move { Ok(py_slf) })
+    }
+
+    // Exit the async runtime context (for 'async with' statement)
+    #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
+    fn __aexit__<'py>(
+        &self,
+        py: Python<'py>,
+        _exc_type: Option<Bound<'py, PyAny>>,
+        _exc_value: Option<Bound<'py, PyAny>>,
+        _traceback: Option<Bound<'py, PyAny>>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        future_into_py(py, async move {
+            // In the future, we can call an async close on the core scanner 
here
+            Ok(false)
+        })

Review Comment:
   `LogScanner.close()` is currently a no-op, and `__aexit__` doesn’t call it. 
This makes `async with ... as scanner:` misleading because it doesn’t actually 
perform any cleanup. If the core scanner doesn’t need cleanup, consider either 
documenting `close()` as a no-op or removing it; otherwise, implement actual 
cleanup and invoke it from `__aexit__` (and potentially from `__del__` for 
safety).



##########
bindings/python/test/test_context_manager.py:
##########
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import pytest
+import pyarrow as pa
+import time
+import fluss
+
+def _poll_records(scanner, expected_count, timeout_s=10):
+    """Poll a record-based scanner until expected_count records are 
collected."""
+    collected = []
+    deadline = time.monotonic() + timeout_s
+    while len(collected) < expected_count and time.monotonic() < deadline:
+        records = scanner.poll(5000)
+        collected.extend(records)
+    return collected
+
[email protected]
+async def test_connection_context_manager(plaintext_bootstrap_servers):
+    config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
+    async with await fluss.FlussConnection.create(config) as conn:
+        admin = conn.get_admin()
+        nodes = await admin.get_server_nodes()
+        assert len(nodes) > 0
+    # conn should be closed (though currently close is a no-op in python side, 
but verifies syntax)
+
[email protected]
+async def test_append_writer_success_flush(connection, admin):
+    table_path = fluss.TablePath("fluss", "test_append_ctx_success")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    
+    schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+    
+    table = await connection.get_table(table_path)
+    
+    async with table.new_append().create_writer() as writer:
+        writer.append({"a": 1})
+        writer.append({"a": 2})
+        # No explicit flush here
+        
+    # After context exit, data should be flushed
+    scanner = await table.new_scan().create_log_scanner()
+    scanner.subscribe(0, fluss.EARLIEST_OFFSET)
+    records = _poll_records(scanner, expected_count=2)
+    assert len(records) == 2
+    assert sorted([r.row["a"] for r in records]) == [1, 2]
+
[email protected]
+async def test_append_writer_exception_no_flush(connection, admin):
+    table_path = fluss.TablePath("fluss", "test_append_ctx_fail")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    
+    schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+    table = await connection.get_table(table_path)
+    
+    class TestException(Exception): pass
+    
+    start_time = time.perf_counter()
+    try:
+        async with table.new_append().create_writer() as writer:
+            writer.append({"a": 100})
+            raise TestException("abort")
+    except TestException:
+        pass
+    duration = time.perf_counter() - start_time
+    
+    # Verification:
+    # 1. The exception was propagated immediately.
+    # 2. The block exited nearly instantly because it bypassed the network 
flush.
+    assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), 
likely performed a flush"
+
+    # NOTE: Records may still eventually arrive because of the background 
sender threads.
+    # We don't assert 0 records here because Fluss does not support true 
transactional rollback.
+
[email protected]
+async def test_upsert_writer_context_manager(connection, admin):
+    table_path = fluss.TablePath("fluss", "test_upsert_ctx")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    
+    schema = fluss.Schema(pa.schema([pa.field("id", pa.int32()), pa.field("v", 
pa.string())]), primary_keys=["id"])
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+    
+    table = await connection.get_table(table_path)
+    
+    # Success path: verify it flushes
+    async with table.new_upsert().create_writer() as writer:
+        writer.upsert({"id": 1, "v": "a"})
+        
+    lookuper = table.new_lookup().create_lookuper()
+    res = await lookuper.lookup({"id": 1})
+    assert res is not None
+    assert res["v"] == "a"
+    
+    # Failure path: verify it bypasses flush
+    class TestException(Exception): pass
+    start_time = time.perf_counter()
+    try:
+        async with table.new_upsert().create_writer() as writer:
+            writer.upsert({"id": 2, "v": "b"})
+            raise TestException("abort")
+    except TestException:
+        pass
+    duration = time.perf_counter() - start_time
+    assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), 
likely performed a flush"
+

Review Comment:
   Same concern as the append-writer timing test: asserting `duration < 0.1` 
for the exception path is prone to CI flakiness and may fail due to scheduler 
jitter unrelated to flushing. Prefer a less brittle threshold or a behavioral 
assertion that doesn’t depend on wall-clock timing.



##########
bindings/python/src/table.rs:
##########
@@ -989,6 +989,37 @@ impl AppendWriter {
         })
     }
 
+    // Enter the async runtime context (for 'async with' statement)
+    fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> 
PyResult<Bound<'py, PyAny>> {
+        let py_slf = slf.into_pyobject(py)?.unbind();
+        future_into_py(py, async move { Ok(py_slf) })
+    }
+
+    // Exit the async runtime context (for 'async with' statement)
+    /// On successful exit, the writer is automatically flushed.
+    /// If an exception occurs, the flush is skipped to allow immediate error
+    /// propagation, though pending records may still be sent in the 
background.
+    #[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))]
+    fn __aexit__<'py>(
+        &self,
+        py: Python<'py>,
+        exc_type: Option<Bound<'py, PyAny>>,
+        _exc_value: Option<Bound<'py, PyAny>>,
+        _traceback: Option<Bound<'py, PyAny>>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let has_error = exc_type.is_some();
+        let inner = self.inner.clone();
+        future_into_py(py, async move {
+            if !has_error {
+                inner
+                    .flush()
+                    .await
+                    .map_err(|e| FlussError::from_core_error(&e))?;
+            }
+            Ok(false)
+        })
+    }

Review Comment:
   `AppendWriter.__aexit__` only flushes on the success path and never 
closes/invalidates the writer. After leaving `async with`, the writer object 
remains fully usable and any underlying resources are not deterministically 
released, which doesn’t match the linked issue’s “flush then close / close on 
exception” contract. Consider adding an explicit `close()` (even if initially a 
no-op) and calling it from `__aexit__`, or otherwise marking the writer as 
closed so further writes fail fast.



##########
bindings/python/src/upsert.rs:
##########
@@ -108,6 +109,37 @@ impl UpsertWriter {
         })
     }
 
+    // Enter the async runtime context (for 'async with' statement)
+    fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> 
PyResult<Bound<'py, PyAny>> {
+        let py_slf = slf.into_pyobject(py)?.unbind();
+        future_into_py(py, async move { Ok(py_slf) })
+    }
+
+    // Exit the async runtime context (for 'async with' statement)
+    /// On successful exit, the writer is automatically flushed.
+    /// If an exception occurs, the flush is skipped to allow immediate error
+    /// propagation, though pending records may still be sent in the 
background.
+    #[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))]
+    fn __aexit__<'py>(
+        &self,
+        py: Python<'py>,
+        exc_type: Option<Bound<'py, PyAny>>,
+        _exc_value: Option<Bound<'py, PyAny>>,
+        _traceback: Option<Bound<'py, PyAny>>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let has_error = exc_type.is_some();
+        let writer = self.writer.clone();
+        future_into_py(py, async move {
+            if !has_error {
+                writer
+                    .flush()
+                    .await
+                    .map_err(|e| FlussError::from_core_error(&e))?;
+            }
+            Ok(false)
+        })

Review Comment:
   `UpsertWriter.__aexit__` only flushes on the success path and does not 
close/invalidate the writer in either path. This means leaving an `async with` 
block does not actually end the writer’s lifecycle, which diverges from the 
linked issue’s expected semantics. Consider adding/using an explicit `close()` 
and invoking it from `__aexit__` (flush+close on success; close-only on 
exception).



##########
bindings/python/example/example.py:
##########
@@ -489,9 +488,10 @@ async def main():
         )
         print("Queued user_id=3 (Charlie)")
 
-        # flush() waits for all queued writes to be acknowledged by the server
-        await upsert_writer.flush()
-        print("Flushed — all 3 rows acknowledged by server")
+        # flush() and close() are automatically called by the 'async with' 
block on successful exit.
+        # Bypass manual flush:

Review Comment:
   This comment claims the `async with` block will automatically call both 
`flush()` and `close()`, but `UpsertWriter.__aexit__` currently only flushes 
and does not close/invalidate the writer. Please update the example comment to 
reflect reality (or add close semantics and invoke them from `__aexit__`).
   ```suggestion
           # flush() is automatically called by the 'async with' block on 
successful exit.
           # No manual flush is needed here:
   ```



##########
bindings/python/example/example.py:
##########
@@ -240,10 +239,10 @@ async def main():
         append_writer.write_pandas(df)
         print("Successfully wrote Pandas DataFrame")
 
-        # Flush all pending data
-        print("\n--- Flushing data ---")
-        await append_writer.flush()
-        print("Successfully flushed data")
+        # Note: flush() and close() are automatically called by the 'async 
with' block on successful exit.

Review Comment:
   This comment says `flush()` and `close()` are automatically called when 
leaving the `async with` block, but the current `AppendWriter.__aexit__` 
implementation only flushes and there is no writer `close()` in the Python API. 
Please update the example text to match the actual behavior (or 
implement/introduce `close()` and call it from `__aexit__`).
   ```suggestion
           # Note: flush() is automatically called by the 'async with' block on 
successful exit.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to