This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f20192  chore: Fire-and-forget behaviour for effecient batching (#258)
7f20192 is described below

commit 7f20192b45cc0b7bb69bf823a61d76d41f9f7d3f
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 7 02:17:35 2026 +0000

    chore: Fire-and-forget behaviour for effecient batching (#258)
---
 bindings/cpp/examples/example.cpp                  | 16 +++++-
 bindings/cpp/include/fluss.hpp                     | 27 ++++++++++
 bindings/cpp/src/lib.rs                            | 37 +++++++++++---
 bindings/cpp/src/table.cpp                         | 57 ++++++++++++++++++++--
 bindings/python/example/example.py                 | 39 ++++++++-------
 bindings/python/src/table.rs                       | 54 ++++++++++++++++----
 bindings/python/src/upsert.rs                      | 47 +++++++++++++-----
 crates/examples/src/example_kv_table.rs            |  5 +-
 .../examples/src/example_partitioned_kv_table.rs   |  5 +-
 crates/examples/src/example_table.rs               |  8 +--
 crates/fluss/src/client/table/append.rs            | 30 +++++++++---
 crates/fluss/src/client/table/upsert.rs            | 30 +++++++-----
 crates/fluss/src/client/write/mod.rs               | 41 ++++++++++++++++
 crates/fluss/tests/integration/kv_table.rs         | 53 ++++++++++++++------
 crates/fluss/tests/integration/log_table.rs        |  4 ++
 15 files changed, 360 insertions(+), 93 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 7022cad..10266c7 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -91,6 +91,7 @@ int main() {
         {3, "Charlie", 92.1f, 35},
     };
 
+    // Fire-and-forget: queue rows, flush at end
     for (const auto& r : rows) {
         fluss::GenericRow row;
         row.SetInt32(0, r.id);
@@ -100,7 +101,20 @@ int main() {
         check("append", writer.Append(row));
     }
     check("flush", writer.Flush());
-    std::cout << "Wrote " << rows.size() << " rows" << std::endl;
+    std::cout << "Wrote " << rows.size() << " rows (fire-and-forget + flush)" 
<< std::endl;
+
+    // Per-record acknowledgment
+    {
+        fluss::GenericRow row;
+        row.SetInt32(0, 100);
+        row.SetString(1, "AckTest");
+        row.SetFloat32(2, 99.9f);
+        row.SetInt32(3, 42);
+        fluss::WriteResult wr;
+        check("append", writer.Append(row, wr));
+        check("wait", wr.Wait());
+        std::cout << "Row acknowledged by server" << std::endl;
+    }
 
     // 6) Scan
     fluss::LogScanner scanner;
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 9461f68..6c20717 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -37,6 +37,7 @@ namespace ffi {
     struct Admin;
     struct Table;
     struct AppendWriter;
+    struct WriteResult;
     struct LogScanner;
 }  // namespace ffi
 
@@ -409,6 +410,7 @@ struct PartitionInfo {
 };
 
 class AppendWriter;
+class WriteResult;
 class LogScanner;
 class Admin;
 class Table;
@@ -534,6 +536,30 @@ private:
     std::vector<size_t> projection_;
 };
 
+class WriteResult {
+public:
+    WriteResult() noexcept;
+    ~WriteResult() noexcept;
+
+    WriteResult(const WriteResult&) = delete;
+    WriteResult& operator=(const WriteResult&) = delete;
+    WriteResult(WriteResult&& other) noexcept;
+    WriteResult& operator=(WriteResult&& other) noexcept;
+
+    bool Available() const;
+
+    /// Wait for server acknowledgment of the write.
+    /// For fire-and-forget, simply let the WriteResult go out of scope.
+    Result Wait();
+
+private:
+    friend class AppendWriter;
+    WriteResult(ffi::WriteResult* inner) noexcept;
+
+    void Destroy() noexcept;
+    ffi::WriteResult* inner_{nullptr};
+};
+
 class AppendWriter {
 public:
     AppendWriter() noexcept;
@@ -547,6 +573,7 @@ public:
     bool Available() const;
 
     Result Append(const GenericRow& row);
+    Result Append(const GenericRow& row, WriteResult& out);
     Result Flush();
 
 private:
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index afdb7c0..7944c10 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -190,6 +190,7 @@ mod ffi {
         type Admin;
         type Table;
         type AppendWriter;
+        type WriteResult;
         type LogScanner;
 
         // Connection
@@ -253,9 +254,12 @@ mod ffi {
 
         // AppendWriter
         unsafe fn delete_append_writer(writer: *mut AppendWriter);
-        fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> FfiResult;
+        fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> 
Result<Box<WriteResult>>;
         fn flush(self: &mut AppendWriter) -> FfiResult;
 
+        // WriteResult — dropped automatically via rust::Box, or call wait() 
for ack
+        fn wait(self: &mut WriteResult) -> FfiResult;
+
         // LogScanner
         unsafe fn delete_log_scanner(scanner: *mut LogScanner);
         fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> 
FfiResult;
@@ -299,6 +303,10 @@ pub struct AppendWriter {
     inner: fcore::client::AppendWriter,
 }
 
+pub struct WriteResult {
+    inner: Option<fcore::client::WriteResultFuture>,
+}
+
 pub struct LogScanner {
     inner: Option<fcore::client::LogScanner>,
     inner_batch: Option<fcore::client::RecordBatchLogScanner>,
@@ -750,15 +758,16 @@ unsafe fn delete_append_writer(writer: *mut AppendWriter) 
{
 }
 
 impl AppendWriter {
-    fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult {
+    fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>, 
String> {
         let generic_row = types::ffi_row_to_core(row);
 
-        let result = RUNTIME.block_on(async { 
self.inner.append(&generic_row).await });
+        let result_future = RUNTIME
+            .block_on(async { self.inner.append(&generic_row).await })
+            .map_err(|e| format!("Failed to append: {e}"))?;
 
-        match result {
-            Ok(_) => ok_result(),
-            Err(e) => err_result(1, e.to_string()),
-        }
+        Ok(Box::new(WriteResult {
+            inner: Some(result_future),
+        }))
     }
 
     fn flush(&mut self) -> ffi::FfiResult {
@@ -771,6 +780,20 @@ impl AppendWriter {
     }
 }
 
+impl WriteResult {
+    fn wait(&mut self) -> ffi::FfiResult {
+        if let Some(future) = self.inner.take() {
+            let result = RUNTIME.block_on(future);
+            match result {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else {
+            ok_result()
+        }
+    }
+}
+
 // LogScanner implementation
 unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
     if !scanner.is_null() {
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index efb762b..24be8d4 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -152,6 +152,45 @@ bool Table::HasPrimaryKey() const {
     return table_->has_primary_key();
 }
 
+// WriteResult implementation
+WriteResult::WriteResult() noexcept = default;
+
+WriteResult::WriteResult(ffi::WriteResult* inner) noexcept : inner_(inner) {}
+
+WriteResult::~WriteResult() noexcept { Destroy(); }
+
+void WriteResult::Destroy() noexcept {
+    if (inner_) {
+        // Reconstruct the rust::Box to let Rust drop the value
+        rust::Box<ffi::WriteResult>::from_raw(inner_);
+        inner_ = nullptr;
+    }
+}
+
+WriteResult::WriteResult(WriteResult&& other) noexcept : inner_(other.inner_) {
+    other.inner_ = nullptr;
+}
+
+WriteResult& WriteResult::operator=(WriteResult&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        inner_ = other.inner_;
+        other.inner_ = nullptr;
+    }
+    return *this;
+}
+
+bool WriteResult::Available() const { return inner_ != nullptr; }
+
+Result WriteResult::Wait() {
+    if (!Available()) {
+        return utils::make_ok();
+    }
+
+    auto ffi_result = inner_->wait();
+    return utils::from_ffi_result(ffi_result);
+}
+
 // AppendWriter implementation
 AppendWriter::AppendWriter() noexcept = default;
 
@@ -182,13 +221,25 @@ AppendWriter& AppendWriter::operator=(AppendWriter&& 
other) noexcept {
 bool AppendWriter::Available() const { return writer_ != nullptr; }
 
 Result AppendWriter::Append(const GenericRow& row) {
+    WriteResult wr;
+    return Append(row, wr);
+}
+
+Result AppendWriter::Append(const GenericRow& row, WriteResult& out) {
     if (!Available()) {
         return utils::make_error(1, "AppendWriter not available");
     }
 
-    auto ffi_row = utils::to_ffi_generic_row(row);
-    auto ffi_result = writer_->append(ffi_row);
-    return utils::from_ffi_result(ffi_result);
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->append(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
 }
 
 Result AppendWriter::Flush() {
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 8735038..1cabaa5 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -417,11 +417,13 @@ async def main():
     print(f"Has primary key: {pk_table.has_primary_key()}")
 
     # --- Test Upsert ---
-    print("\n--- Testing Upsert ---")
+    print("\n--- Testing Upsert (fire-and-forget) ---")
     try:
         upsert_writer = pk_table.new_upsert()
         print(f"Created upsert writer: {upsert_writer}")
 
+        # Fire-and-forget: queue writes without waiting for individual acks.
+        # Records are batched internally for efficiency.
         await upsert_writer.upsert(
             {
                 "user_id": 1,
@@ -437,7 +439,7 @@ async def main():
                 "balance": Decimal("1234.56"),
             }
         )
-        print("Upserted user_id=1 (Alice)")
+        print("Queued user_id=1 (Alice)")
 
         await upsert_writer.upsert(
             {
@@ -452,7 +454,7 @@ async def main():
                 "balance": Decimal("5678.91"),
             }
         )
-        print("Upserted user_id=2 (Bob)")
+        print("Queued user_id=2 (Bob)")
 
         await upsert_writer.upsert(
             {
@@ -467,10 +469,17 @@ async def main():
                 "balance": Decimal("9876.54"),
             }
         )
-        print("Upserted user_id=3 (Charlie)")
+        print("Queued user_id=3 (Charlie)")
 
-        # Update an existing row (same PK, different values)
-        await upsert_writer.upsert(
+        # flush() waits for all queued writes to be acknowledged by the server
+        await upsert_writer.flush()
+        print("Flushed — all 3 rows acknowledged by server")
+
+        # Per-record acknowledgment: await the returned handle to block until
+        # the server confirms this specific write, useful when you need to
+        # read-after-write or verify critical updates.
+        print("\n--- Testing Upsert (per-record acknowledgment) ---")
+        ack = await upsert_writer.upsert(
             {
                 "user_id": 1,
                 "name": "Alice Updated",
@@ -485,11 +494,8 @@ async def main():
                 "balance": Decimal("2345.67"),
             }
         )
-        print("Updated user_id=1 (Alice -> Alice Updated)")
-
-        # Explicit flush to ensure all upserts are acknowledged
-        await upsert_writer.flush()
-        print("Flushed all upserts")
+        await ack  # wait for server acknowledgment before proceeding
+        print("Updated user_id=1 (Alice -> Alice Updated) — server 
acknowledged")
 
     except Exception as e:
         print(f"Error during upsert: {e}")
@@ -548,13 +554,10 @@ async def main():
     try:
         upsert_writer = pk_table.new_upsert()
 
-        # Delete only needs PK columns - much simpler API!
-        await upsert_writer.delete({"user_id": 3})
-        print("Deleted user_id=3")
-
-        # Explicit flush to ensure delete is acknowledged
-        await upsert_writer.flush()
-        print("Flushed delete")
+        # Per-record ack for delete — await the handle to confirm deletion
+        ack = await upsert_writer.delete({"user_id": 3})
+        await ack
+        print("Deleted user_id=3 — server acknowledged")
 
         lookuper = pk_table.new_lookup()
         result = await lookuper.lookup({"user_id": 3})
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 7184c8d..e987d43 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -526,35 +526,59 @@ pub struct AppendWriter {
 
 #[pymethods]
 impl AppendWriter {
-    /// Write Arrow table data
+    /// Write Arrow table data (fire-and-forget, use flush() to ensure 
delivery)
     pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> {
         // Convert Arrow Table to batches and write each batch
         let batches = table.call_method0(py, "to_batches")?;
         let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
 
         for batch in batch_list {
-            self.write_arrow_batch(py, batch)?;
+            // Drop the ack coroutine — fire-and-forget
+            let _ = self.write_arrow_batch(py, batch)?;
         }
         Ok(())
     }
 
     /// Write Arrow batch data
-    pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> 
PyResult<()> {
+    ///
+    /// Returns:
+    ///     A coroutine that can be awaited for server acknowledgment,
+    ///     or ignored for fire-and-forget behavior.
+    pub fn write_arrow_batch<'py>(
+        &self,
+        py: Python<'py>,
+        batch: Py<PyAny>,
+    ) -> PyResult<Bound<'py, PyAny>> {
         // This shares the underlying Arrow buffers without copying data
         let batch_bound = batch.bind(py);
         let rust_batch: ArrowRecordBatch = 
FromPyArrow::from_pyarrow_bound(batch_bound)
             .map_err(|e| FlussError::new_err(format!("Failed to convert 
RecordBatch: {e}")))?;
 
         let inner = self.inner.clone();
-        // Release the GIL before blocking on async operation
-        let result = py.detach(|| {
-            TOKIO_RUNTIME.block_on(async { 
inner.append_arrow_batch(rust_batch).await })
-        });
 
-        result.map_err(|e| FlussError::new_err(e.to_string()))
+        future_into_py(py, async move {
+            let result_future = inner
+                .append_arrow_batch(rust_batch)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            Python::attach(|py| {
+                future_into_py(py, async move {
+                    result_future
+                        .await
+                        .map_err(|e| FlussError::new_err(e.to_string()))?;
+                    Ok(())
+                })
+                .map(|bound| bound.unbind())
+            })
+        })
     }
 
     /// Append a single row to the table
+    ///
+    /// Returns:
+    ///     A coroutine that can be awaited for server acknowledgment,
+    ///     or ignored for fire-and-forget behavior.
     pub fn append<'py>(
         &self,
         py: Python<'py>,
@@ -564,10 +588,20 @@ impl AppendWriter {
         let inner = self.inner.clone();
 
         future_into_py(py, async move {
-            inner
+            let result_future = inner
                 .append(&generic_row)
                 .await
-                .map_err(|e| FlussError::new_err(e.to_string()))
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            Python::attach(|py| {
+                future_into_py(py, async move {
+                    result_future
+                        .await
+                        .map_err(|e| FlussError::new_err(e.to_string()))?;
+                    Ok(())
+                })
+                .map(|bound| bound.unbind())
+            })
         })
     }
 
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
index 08b3597..5c10ded 100644
--- a/bindings/python/src/upsert.rs
+++ b/bindings/python/src/upsert.rs
@@ -23,16 +23,21 @@ use tokio::sync::Mutex;
 
 /// Writer for upserting and deleting data in a Fluss primary key table.
 ///
-/// Each upsert/delete operation is sent to the server and waits for 
acknowledgment.
-/// Multiple concurrent writers share a common WriterClient which batches 
requests
-/// for efficiency.
+/// Each upsert/delete operation queues the write and returns a coroutine
+/// that can be awaited for per-record acknowledgment, or ignored for
+/// fire-and-forget semantics (call `flush()` to ensure delivery).
 ///
 /// # Example:
 ///     writer = table.new_upsert()
+///
+///     # Fire-and-forget with flush
 ///     await writer.upsert(row1)
 ///     await writer.upsert(row2)
-///     await writer.delete(pk)
-///     await writer.flush()  # Ensures all pending operations are acknowledged
+///     await writer.flush()
+///
+///     # Or await individual acknowledgment
+///     ack = await writer.upsert(row3)
+///     await ack
 #[pyclass]
 pub struct UpsertWriter {
     inner: Arc<UpsertWriterInner>,
@@ -58,7 +63,8 @@ impl UpsertWriter {
     ///          For list/tuple: values must be in schema order.
     ///
     /// Returns:
-    ///     None on success
+    ///     A coroutine that can be awaited for server acknowledgment,
+    ///     or ignored for fire-and-forget behavior.
     pub fn upsert<'py>(
         &self,
         py: Python<'py>,
@@ -70,11 +76,20 @@ impl UpsertWriter {
         future_into_py(py, async move {
             let mut guard = inner.get_or_create_writer().await?;
             let writer = guard.as_mut().unwrap();
-            writer
+            let result_future = writer
                 .upsert(&generic_row)
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
-            Ok(())
+
+            Python::attach(|py| {
+                future_into_py(py, async move {
+                    result_future
+                        .await
+                        .map_err(|e| FlussError::new_err(e.to_string()))?;
+                    Ok(())
+                })
+                .map(|bound| bound.unbind())
+            })
         })
     }
 
@@ -86,7 +101,8 @@ impl UpsertWriter {
     ///         For list/tuple: values in PK column order.
     ///
     /// Returns:
-    ///     None on success
+    ///     A coroutine that can be awaited for server acknowledgment,
+    ///     or ignored for fire-and-forget behavior.
     pub fn delete<'py>(
         &self,
         py: Python<'py>,
@@ -98,11 +114,20 @@ impl UpsertWriter {
         future_into_py(py, async move {
             let mut guard = inner.get_or_create_writer().await?;
             let writer = guard.as_mut().unwrap();
-            writer
+            let result_future = writer
                 .delete(&generic_row)
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
-            Ok(())
+
+            Python::attach(|py| {
+                future_into_py(py, async move {
+                    result_future
+                        .await
+                        .map_err(|e| FlussError::new_err(e.to_string()))?;
+                    Ok(())
+                })
+                .map(|bound| bound.unbind())
+            })
         })
     }
 
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 2bbcc74..2fcb134 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -65,6 +65,7 @@ pub async fn main() -> Result<()> {
         upsert_writer.upsert(&row).await?;
         println!("Upserted: {row:?}");
     }
+    upsert_writer.flush().await?;
 
     println!("\n=== Looking up ===");
     let mut lookuper = table.new_lookup()?.create_lookuper()?;
@@ -84,7 +85,7 @@ pub async fn main() -> Result<()> {
     row.set_field(0, 1);
     row.set_field(1, "Verso");
     row.set_field(2, 33i64);
-    upsert_writer.upsert(&row).await?;
+    upsert_writer.upsert(&row).await?.await?;
     println!("Updated: {row:?}");
 
     let result = lookuper.lookup(&make_key(1)).await?;
@@ -99,7 +100,7 @@ pub async fn main() -> Result<()> {
     // For delete, only primary key field needs to be set; other fields can 
remain null
     let mut row = GenericRow::new(3);
     row.set_field(0, 2);
-    upsert_writer.delete(&row).await?;
+    upsert_writer.delete(&row).await?.await?;
     println!("Deleted row with id=2");
 
     let result = lookuper.lookup(&make_key(2)).await?;
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index 884869e..feb8f05 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -77,6 +77,7 @@ pub async fn main() -> Result<()> {
         upsert_writer.upsert(&row).await?;
         println!("Upserted: {row:?}");
     }
+    upsert_writer.flush().await?;
 
     println!("\n=== Looking up ===");
     let mut lookuper = table.new_lookup()?.create_lookuper()?;
@@ -101,7 +102,7 @@ pub async fn main() -> Result<()> {
     row.set_field(1, "APAC");
     row.set_field(2, 1i64);
     row.set_field(3, 4321i64);
-    upsert_writer.upsert(&row).await?;
+    upsert_writer.upsert(&row).await?.await?;
     println!("Updated: {row:?}");
 
     let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
@@ -117,7 +118,7 @@ pub async fn main() -> Result<()> {
     row.set_field(0, 1002);
     row.set_field(1, "EMEA");
     row.set_field(2, 2i64);
-    upsert_writer.delete(&row).await?;
+    upsert_writer.delete(&row).await?.await?;
     println!("Deleted: {row:?}");
 
     let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index 733b13e..ee9bc7b 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -25,7 +25,6 @@ use fluss::error::Result;
 use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
 use fluss::row::{GenericRow, InternalRow};
 use std::time::Duration;
-use tokio::try_join;
 
 #[tokio::main]
 pub async fn main() -> Result<()> {
@@ -64,13 +63,14 @@ pub async fn main() -> Result<()> {
 
     let table = conn.get_table(&table_path).await?;
     let append_writer = table.new_append()?.create_writer()?;
-    let f1 = append_writer.append(&row);
+    // Fire-and-forget: queue writes then flush
+    append_writer.append(&row).await?;
     let mut row = GenericRow::new(3);
     row.set_field(0, 233333);
     row.set_field(1, "tt44");
     row.set_field(2, 987_654_321_987i64);
-    let f2 = append_writer.append(&row);
-    try_join!(f1, f2, append_writer.flush())?;
+    append_writer.append(&row).await?;
+    append_writer.flush().await?;
 
     // scan rows
     let log_scanner = table.new_scan().create_log_scanner()?;
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index ace91a6..e26b61a 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::client::table::partition_getter::{PartitionGetter, 
get_physical_path};
-use crate::client::{WriteRecord, WriterClient};
+use crate::client::{WriteRecord, WriteResultFuture, WriterClient};
 use crate::error::Result;
 use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
 use crate::row::{ColumnarRow, InternalRow};
@@ -69,7 +69,18 @@ pub struct AppendWriter {
 }
 
 impl AppendWriter {
-    pub async fn append<R: InternalRow>(&self, row: &R) -> Result<()> {
+    /// Appends a row to the table.
+    ///
+    /// This method returns a [`WriteResultFuture`] immediately after queueing 
the write,
+    /// enabling fire-and-forget semantics for efficient batching.
+    ///
+    /// # Arguments
+    /// * row - the row to append.
+    ///
+    /// # Returns
+    /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
+    /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
+    pub async fn append<R: InternalRow>(&self, row: &R) -> 
Result<WriteResultFuture> {
         let physical_table_path = Arc::new(get_physical_path(
             &self.table_path,
             self.partition_getter.as_ref(),
@@ -82,15 +93,21 @@ impl AppendWriter {
             row,
         );
         let result_handle = self.writer_client.send(&record).await?;
-        let result = result_handle.wait().await?;
-        result_handle.result(result)
+        Ok(WriteResultFuture::new(result_handle))
     }
 
     /// Appends an Arrow RecordBatch to the table.
     ///
+    /// This method returns a [`WriteResultFuture`] immediately after queueing 
the write,
+    /// enabling fire-and-forget semantics for efficient batching.
+    ///
     /// For partitioned tables, the partition is derived from the **first 
row** of the batch.
     /// Callers must ensure all rows in the batch belong to the same partition.
-    pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+    ///
+    /// # Returns
+    /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
+    /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
+    pub async fn append_arrow_batch(&self, batch: RecordBatch) -> 
Result<WriteResultFuture> {
         let physical_table_path = if self.partition_getter.is_some() && 
batch.num_rows() > 0 {
             let first_row = ColumnarRow::new(Arc::new(batch.clone()));
             Arc::new(get_physical_path(
@@ -109,8 +126,7 @@ impl AppendWriter {
             batch,
         );
         let result_handle = self.writer_client.send(&record).await?;
-        let result = result_handle.wait().await?;
-        result_handle.result(result)
+        Ok(WriteResultFuture::new(result_handle))
     }
 
     pub async fn flush(&self) -> Result<()> {
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index 92f6a20..a1646cc 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
+use crate::client::{RowBytes, WriteFormat, WriteRecord, WriteResultFuture, 
WriterClient};
 use crate::error::Error::{IllegalArgument, UnexpectedError};
 use crate::error::Result;
 use crate::metadata::{RowType, TableInfo, TablePath};
@@ -345,12 +345,16 @@ impl UpsertWriter {
 
     /// Inserts row into Fluss table if they do not already exist, or updates 
them if they do exist.
     ///
+    /// This method returns a [`WriteResultFuture`] immediately after queueing 
the write,
+    /// enabling fire-and-forget semantics for efficient batching.
+    ///
     /// # Arguments
     /// * row - the row to upsert.
     ///
     /// # Returns
-    /// Ok(UpsertResult) when completed normally
-    pub async fn upsert<R: InternalRow>(&self, row: &R) -> 
Result<UpsertResult> {
+    /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
+    /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
+    pub async fn upsert<R: InternalRow>(&self, row: &R) -> 
Result<WriteResultFuture> {
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
@@ -376,20 +380,22 @@ impl UpsertWriter {
         );
 
         let result_handle = self.writer_client.send(&write_record).await?;
-        let result = result_handle.wait().await?;
-
-        result_handle.result(result).map(|_| UpsertResult)
+        Ok(WriteResultFuture::new(result_handle))
     }
 
     /// Delete certain row by the input row in Fluss table, the input row must 
contain the primary
     /// key.
     ///
+    /// This method returns a [`WriteResultFuture`] immediately after queueing 
the delete,
+    /// enabling fire-and-forget semantics for efficient batching.
+    ///
     /// # Arguments
-    /// * row - the row to delete.
+    /// * row - the row to delete (must contain the primary key fields).
     ///
     /// # Returns
-    /// Ok(DeleteResult) when completed normally
-    pub async fn delete<R: InternalRow>(&self, row: &R) -> 
Result<DeleteResult> {
+    /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
+    /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
+    pub async fn delete<R: InternalRow>(&self, row: &R) -> 
Result<WriteResultFuture> {
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
@@ -410,9 +416,7 @@ impl UpsertWriter {
         );
 
         let result_handle = self.writer_client.send(&write_record).await?;
-        let result = result_handle.wait().await?;
-
-        result_handle.result(result).map(|_| DeleteResult)
+        Ok(WriteResultFuture::new(result_handle))
     }
 }
 
@@ -546,9 +550,11 @@ mod tests {
 /// The result of upserting a record
 /// Currently this is an empty struct to allow for compatible evolution in the 
future
 #[derive(Default)]
+#[allow(dead_code)]
 pub struct UpsertResult;
 
 /// The result of deleting a record
 /// Currently this is an empty struct to allow for compatible evolution in the 
future
 #[derive(Default)]
+#[allow(dead_code)]
 pub struct DeleteResult;
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index 25a0db6..49eff05 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -26,7 +26,10 @@ use crate::row::InternalRow;
 pub use accumulator::*;
 use arrow::array::RecordBatch;
 use bytes::Bytes;
+use std::future::Future;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 pub(crate) mod broadcast;
 mod bucket_assigner;
@@ -198,3 +201,41 @@ impl ResultHandle {
         })
     }
 }
+
+/// A future that represents a pending write operation.
+///
+/// This type implements [`Future`], allowing users to either:
+/// 1. Await immediately to block on acknowledgment: 
`writer.upsert(&row).await?.await?`
+/// 2. Fire-and-forget with later flush: `writer.upsert(&row).await?; 
writer.flush().await?`
+///
+/// This pattern is similar to rdkafka's `DeliveryFuture` and allows for 
efficient batching
+/// when users don't need immediate per-record acknowledgment.
+pub struct WriteResultFuture {
+    inner: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
+}
+
+impl std::fmt::Debug for WriteResultFuture {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WriteResultFuture").finish_non_exhaustive()
+    }
+}
+
+impl WriteResultFuture {
+    /// Create a new WriteResultFuture from a ResultHandle.
+    pub fn new(result_handle: ResultHandle) -> Self {
+        Self {
+            inner: Box::pin(async move {
+                let result = result_handle.wait().await?;
+                result_handle.result(result)
+            }),
+        }
+    }
+}
+
+impl Future for WriteResultFuture {
+    type Output = Result<(), Error>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
+        self.inner.as_mut().poll(cx)
+    }
+}
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index 87d90b0..0bfe4a3 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -95,7 +95,7 @@ mod kv_table_test {
 
         let test_data = [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 
35)];
 
-        // Upsert rows
+        // Upsert rows (fire-and-forget, then flush)
         for (id, name, age) in &test_data {
             let mut row = GenericRow::new(3);
             row.set_field(0, *id);
@@ -106,6 +106,7 @@ mod kv_table_test {
                 .await
                 .expect("Failed to upsert row");
         }
+        upsert_writer.flush().await.expect("Failed to flush");
 
         // Lookup records
         let mut lookuper = table
@@ -130,7 +131,7 @@ mod kv_table_test {
             assert_eq!(row.get_long(2), *expected_age, "age mismatch");
         }
 
-        // Update the record with new age
+        // Update the record with new age (await acknowledgment)
         let mut updated_row = GenericRow::new(3);
         updated_row.set_field(0, 1);
         updated_row.set_field(1, "Verso");
@@ -138,7 +139,9 @@ mod kv_table_test {
         upsert_writer
             .upsert(&updated_row)
             .await
-            .expect("Failed to upsert updated row");
+            .expect("Failed to upsert updated row")
+            .await
+            .expect("Failed to wait for upsert acknowledgment");
 
         // Verify the update
         let result = lookuper
@@ -160,13 +163,15 @@ mod kv_table_test {
             "Name should remain unchanged"
         );
 
-        // Delete record with id=1
+        // Delete record with id=1 (await acknowledgment)
         let mut delete_row = GenericRow::new(3);
         delete_row.set_field(0, 1);
         upsert_writer
             .delete(&delete_row)
             .await
-            .expect("Failed to delete");
+            .expect("Failed to delete")
+            .await
+            .expect("Failed to wait for delete acknowledgment");
 
         // Verify deletion
         let result = lookuper
@@ -265,6 +270,7 @@ mod kv_table_test {
             row.set_field(2, *score);
             upsert_writer.upsert(&row).await.expect("Failed to upsert");
         }
+        upsert_writer.flush().await.expect("Failed to flush");
 
         // Lookup with composite key
         let mut lookuper = table
@@ -295,7 +301,7 @@ mod kv_table_test {
             .expect("Row should exist");
         assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
 
-        // Update (US, 1) score
+        // Update (US, 1) score (await acknowledgment)
         let mut update_row = GenericRow::new(3);
         update_row.set_field(0, "US");
         update_row.set_field(1, 1);
@@ -303,7 +309,9 @@ mod kv_table_test {
         upsert_writer
             .upsert(&update_row)
             .await
-            .expect("Failed to update");
+            .expect("Failed to update")
+            .await
+            .expect("Failed to wait for update acknowledgment");
 
         // Verify update
         let mut key = GenericRow::new(3);
@@ -372,7 +380,9 @@ mod kv_table_test {
         upsert_writer
             .upsert(&row)
             .await
-            .expect("Failed to upsert initial row");
+            .expect("Failed to upsert initial row")
+            .await
+            .expect("Failed to wait for upsert acknowledgment");
 
         // Verify initial record
         let mut lookuper = table
@@ -403,7 +413,7 @@ mod kv_table_test {
             .create_writer()
             .expect("Failed to create UpsertWriter with partial write");
 
-        // Update only the score column
+        // Update only the score column (await acknowledgment)
         let mut partial_row = GenericRow::new(4);
         partial_row.set_field(0, 1);
         partial_row.set_field(1, Datum::Null); // not in partial update column
@@ -412,7 +422,9 @@ mod kv_table_test {
         partial_writer
             .upsert(&partial_row)
             .await
-            .expect("Failed to upsert");
+            .expect("Failed to upsert")
+            .await
+            .expect("Failed to wait for upsert acknowledgment");
 
         // Verify partial update - name and age should remain unchanged
         let result = lookuper
@@ -499,6 +511,7 @@ mod kv_table_test {
             row.set_field(3, *score);
             upsert_writer.upsert(&row).await.expect("Failed to upsert");
         }
+        upsert_writer.flush().await.expect("Failed to flush");
 
         // Create lookuper
         let mut lookuper = table
@@ -525,7 +538,7 @@ mod kv_table_test {
             assert_eq!(row.get_long(3), *expected_score, "score mismatch");
         }
 
-        // Test update within a partition
+        // Test update within a partition (await acknowledgment)
         let mut updated_row = GenericRow::new(4);
         updated_row.set_field(0, "US");
         updated_row.set_field(1, 1);
@@ -534,7 +547,9 @@ mod kv_table_test {
         upsert_writer
             .upsert(&updated_row)
             .await
-            .expect("Failed to upsert updated row");
+            .expect("Failed to upsert updated row")
+            .await
+            .expect("Failed to wait for upsert acknowledgment");
 
         // Verify the update
         let mut key = GenericRow::new(4);
@@ -564,14 +579,16 @@ mod kv_table_test {
             "Lookup in non-existent partition should return None"
         );
 
-        // Delete a record within a partition
+        // Delete a record within a partition (await acknowledgment)
         let mut delete_key = GenericRow::new(4);
         delete_key.set_field(0, "EU");
         delete_key.set_field(1, 1);
         upsert_writer
             .delete(&delete_key)
             .await
-            .expect("Failed to delete");
+            .expect("Failed to delete")
+            .await
+            .expect("Failed to wait for delete acknowledgment");
 
         // Verify deletion
         let mut key = GenericRow::new(4);
@@ -705,7 +722,9 @@ mod kv_table_test {
         upsert_writer
             .upsert(&row)
             .await
-            .expect("Failed to upsert row with all datatypes");
+            .expect("Failed to upsert row with all datatypes")
+            .await
+            .expect("Failed to wait for upsert acknowledgment");
 
         // Lookup the record
         let mut lookuper = table
@@ -808,7 +827,9 @@ mod kv_table_test {
         upsert_writer
             .upsert(&row_with_nulls)
             .await
-            .expect("Failed to upsert row with nulls");
+            .expect("Failed to upsert row with nulls")
+            .await
+            .expect("Failed to wait for upsert acknowledgment");
 
         // Lookup row with nulls
         let mut key2 = GenericRow::new(17);
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 493bb34..8d7773d 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -109,6 +109,7 @@ mod table_test {
             .await
             .expect("Failed to append batch");
 
+        // Flush to ensure all writes are acknowledged
         append_writer.flush().await.expect("Failed to flush");
 
         // Create scanner to verify appended records
@@ -232,6 +233,9 @@ mod table_test {
             .await
             .expect("Failed to append batch");
 
+        // Flush to ensure all writes are acknowledged
+        append_writer.flush().await.expect("Failed to flush");
+
         tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 
         let after_append_ms = Timestamp::now().as_millisecond();


Reply via email to