This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6b86f17 chore: remove async for upsert/delete/append in python (#280)
6b86f17 is described below
commit 6b86f170f2b5448f0efc568a7f5d12710ede77b7
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Feb 8 12:21:37 2026 +0800
chore: remove async for upsert/delete/append in python (#280)
---
bindings/python/fluss/__init__.pyi | 33 ++++++++++++++++++++++++++-------
1 file changed, 26 insertions(+), 7 deletions(-)
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index ceef155..50f3b20 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -301,13 +301,16 @@ class FlussTable:
def __repr__(self) -> str: ...
class AppendWriter:
- async def append(self, row: dict | list | tuple) -> None:
+ def append(self, row: dict | list | tuple) -> WriteResultHandle:
"""Append a single row to the table.
Args:
row: Dictionary mapping field names to values, or
list/tuple of values in schema order
+ Returns:
+ WriteResultHandle: Ignore for fire-and-forget, or await
handle.wait() for acknowledgement.
+
Supported Types:
Currently supports primitive types only:
- Boolean, TinyInt, SmallInt, Int, BigInt (integers)
@@ -319,8 +322,8 @@ class AppendWriter:
Temporal types (Date, Timestamp, Decimal) are not yet supported.
Example:
- await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
- await writer.append([1, 'Alice', 95.5])
+ writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
+ writer.append([1, 'Alice', 95.5])
Note:
For high-throughput bulk loading, prefer write_arrow_batch().
@@ -328,15 +331,15 @@ class AppendWriter:
"""
...
def write_arrow(self, table: pa.Table) -> None: ...
- def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
+ def write_arrow_batch(self, batch: pa.RecordBatch) -> WriteResultHandle:
...
def write_pandas(self, df: pd.DataFrame) -> None: ...
- def flush(self) -> None: ...
+ async def flush(self) -> None: ...
def __repr__(self) -> str: ...
class UpsertWriter:
"""Writer for upserting and deleting data in a Fluss primary key table."""
- async def upsert(self, row: dict | list | tuple) -> None:
+ def upsert(self, row: dict | list | tuple) -> WriteResultHandle:
"""Upsert a row into the table.
If a row with the same primary key exists, it will be updated.
@@ -345,14 +348,20 @@ class UpsertWriter:
Args:
row: Dictionary mapping field names to values, or
list/tuple of values in schema order
+
+ Returns:
+ WriteResultHandle: Ignore for fire-and-forget, or await
handle.wait() for ack.
"""
...
- async def delete(self, pk: dict | list | tuple) -> None:
+ def delete(self, pk: dict | list | tuple) -> WriteResultHandle:
"""Delete a row from the table by primary key.
Args:
pk: Dictionary with PK column names as keys, or
list/tuple of PK values in PK column order
+
+ Returns:
+ WriteResultHandle: Ignore for fire-and-forget, or await
handle.wait() for ack.
"""
...
async def flush(self) -> None:
@@ -360,6 +369,16 @@ class UpsertWriter:
...
def __repr__(self) -> str: ...
+
+class WriteResultHandle:
+ """Handle for a pending write (append/upsert/delete). Ignore for
fire-and-forget, or await handle.wait() for ack."""
+
+ async def wait(self) -> None:
+ """Wait for server acknowledgment of this write."""
+ ...
+ def __repr__(self) -> str: ...
+
+
class Lookuper:
"""Lookuper for performing primary key lookups on a Fluss table."""