leekeiabstraction commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3060786254
##########
bindings/python/test/test_log_table.py:
##########
@@ -729,6 +729,382 @@ async def
test_scan_records_indexing_and_slicing(connection, admin):
await admin.drop_table(table_path, ignore_if_not_exists=False)
+async def test_async_iterator(connection, admin):
+ """Test the Python asynchronous iterator loop (`async for`) on
LogScanner."""
+ table_path = fluss.TablePath("fluss", "py_test_async_iterator")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+ )
+ await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+
+ # Write 5 records
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array(list(range(1, 6)), type=pa.int32()),
+ pa.array([f"async{i}" for i in range(1, 6)])],
+ schema=pa.schema([pa.field("id", pa.int32()), pa.field("val",
pa.string())]),
+ )
+ )
+ await writer.flush()
+
+ scanner = await table.new_scan().create_log_scanner()
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+ collected = []
+
+ # Here is the magical Issue #424 async iterator logic at work:
+ async def consume_scanner():
+ async for record in scanner:
+ collected.append(record)
+ if len(collected) == 5:
+ break
+
+ # We must race the consumption against a timeout so the test doesn't hang
if the iterator is broken
+ await asyncio.wait_for(consume_scanner(), timeout=10.0)
+
+ assert len(collected) == 5, f"Expected 5 records, got {len(collected)}"
+
+ collected.sort(key=lambda r: r.row["id"])
+ for i, record in enumerate(collected):
+ assert record.row["id"] == i + 1
+ assert record.row["val"] == f"async{i + 1}"
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_async_iterator_break_no_leak(connection, admin):
+ """Verify that breaking out of `async for` does not leak resources.
+
+ After breaking, the scanner must still be usable for synchronous
+ `poll()` calls. If the old implementation's tokio::spawn'd task
+ were still alive, it would hold the Mutex and cause `poll()` to
+ deadlock or error.
+ """
+ table_path = fluss.TablePath("fluss", "py_test_async_break_leak")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+ )
+ await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [
+ pa.array(list(range(1, 11)), type=pa.int32()),
+ pa.array([f"v{i}" for i in range(1, 11)]),
+ ],
+ schema=pa.schema(
+ [pa.field("id", pa.int32()), pa.field("val", pa.string())]
+ ),
+ )
+ )
+ await writer.flush()
+
+ scanner = await table.new_scan().create_log_scanner()
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner.subscribe_buckets(
+ {i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}
+ )
+
+ # Phase 1: async for with early break (collect only 3 of 10)
+ collected_async = []
+
+ async def consume_and_break():
+ async for record in scanner:
+ collected_async.append(record)
+ if len(collected_async) >= 3:
+ break
+
+ await asyncio.wait_for(consume_and_break(), timeout=10.0)
+ assert len(collected_async) == 3, (
+ f"Expected 3 records from async for, got {len(collected_async)}"
+ )
+
+ # Phase 2: sync poll() must still work — proves no leaked task / lock.
+ # With small data and few buckets, _async_poll may have fetched all
+ # records in one batch. After break, the un-yielded records from that
+ # batch are lost. So sync poll may return 0 records — the key assertion
+ # is that poll() completes without deadlock (returns within timeout).
+ remaining = scanner.poll(2000)
+ assert remaining is not None, "poll() should return (not deadlock)"
+
+ # If we got records, verify no duplicates
+ async_ids = {r.row["id"] for r in collected_async}
+ sync_ids = {r.row["id"] for r in remaining}
+ assert async_ids.isdisjoint(sync_ids), (
+ f"Duplicate IDs between async and sync: {async_ids & sync_ids}"
+ )
+
+ # All IDs must be from the original 1-10 range
+ all_ids = async_ids | sync_ids
+ assert all_ids.issubset(set(range(1, 11))), (
+ f"Unexpected IDs: {all_ids - set(range(1, 11))}"
+ )
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_async_iterator_multiple_batches(connection, admin):
+ """Verify async iteration works across multiple network poll cycles.
+
+ _async_poll does a single bounded poll per call. Writing 20 records
Review Comment:
Q: how does 20 trigger multiple batches?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]