This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 26edb7a  chore: remove async from queueing while writing (#271)
26edb7a is described below

commit 26edb7ac2c956ee9b19def250aa876a7382e8ace
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 7 14:16:30 2026 +0000

    chore: remove async from queueing while writing (#271)
---
 README.md                                          |   3 +-
 bindings/cpp/src/lib.rs                            |   7 +-
 bindings/python/example/example.py                 |  33 +++---
 bindings/python/src/lib.rs                         |   3 +
 bindings/python/src/table.rs                       |  88 +++++----------
 bindings/python/src/upsert.rs                      | 124 ++++++++-------------
 bindings/python/src/write_handle.rs                |  80 +++++++++++++
 crates/examples/src/example_kv_table.rs            |   6 +-
 .../examples/src/example_partitioned_kv_table.rs   |   6 +-
 crates/examples/src/example_table.rs               |   4 +-
 crates/fluss/src/client/table/append.rs            |   8 +-
 crates/fluss/src/client/table/upsert.rs            |   8 +-
 crates/fluss/src/client/write/accumulator.rs       |  64 +++++------
 crates/fluss/src/client/write/mod.rs               |   4 +-
 crates/fluss/src/client/write/sender.rs            |  86 +++++++-------
 crates/fluss/src/client/write/writer_client.rs     |  12 +-
 crates/fluss/tests/integration/kv_table.rs         |  18 +--
 crates/fluss/tests/integration/log_table.rs        |  17 +--
 .../fluss/tests/integration/table_remote_scan.rs   |   7 +-
 19 files changed, 269 insertions(+), 309 deletions(-)

diff --git a/README.md b/README.md
index ee9478c..5e771d9 100644
--- a/README.md
+++ b/README.md
@@ -101,7 +101,8 @@ pub async fn main() -> Result<()> {
     let table = conn.get_table(&table_path).await;
     let append_writer = table.new_append().create_writer();
     let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8, 
["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap();
-    append_writer.append(batch).await?;
+    append_writer.append(batch)?;
+    append_writer.flush().await?;
     println!("Start to scan log records......");
     // 4: scan the records
     let log_scanner = table.new_scan().create_log_scanner();
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 7944c10..4957c99 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -761,8 +761,9 @@ impl AppendWriter {
     fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>, 
String> {
         let generic_row = types::ffi_row_to_core(row);
 
-        let result_future = RUNTIME
-            .block_on(async { self.inner.append(&generic_row).await })
+        let result_future = self
+            .inner
+            .append(&generic_row)
             .map_err(|e| format!("Failed to append: {e}"))?;
 
         Ok(Box::new(WriteResult {
@@ -789,7 +790,7 @@ impl WriteResult {
                 Err(e) => err_result(1, e.to_string()),
             }
         } else {
-            ok_result()
+            err_result(1, "WriteResult already consumed".to_string())
         }
     }
 }
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 1cabaa5..d56879a 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -184,7 +184,7 @@ async def main():
         # Test 3: Append single rows with Date, Time, Timestamp, Decimal
         print("\n--- Testing single row append with temporal/decimal types 
---")
         # Dict input with all types including Date, Time, Timestamp, Decimal
-        await append_writer.append(
+        append_writer.append(
             {
                 "id": 8,
                 "name": "Helen",
@@ -200,7 +200,7 @@ async def main():
         print("Successfully appended row (dict with Date, Time, Timestamp, 
Decimal)")
 
         # List input with all types
-        await append_writer.append(
+        append_writer.append(
             [
                 9,
                 "Ivan",
@@ -242,7 +242,7 @@ async def main():
 
         # Flush all pending data
         print("\n--- Flushing data ---")
-        append_writer.flush()
+        await append_writer.flush()
         print("Successfully flushed data")
 
         # Demo: Check offsets after writes
@@ -422,9 +422,9 @@ async def main():
         upsert_writer = pk_table.new_upsert()
         print(f"Created upsert writer: {upsert_writer}")
 
-        # Fire-and-forget: queue writes without waiting for individual acks.
+        # Fire-and-forget: queue writes synchronously, flush at end.
         # Records are batched internally for efficiency.
-        await upsert_writer.upsert(
+        upsert_writer.upsert(
             {
                 "user_id": 1,
                 "name": "Alice",
@@ -441,7 +441,7 @@ async def main():
         )
         print("Queued user_id=1 (Alice)")
 
-        await upsert_writer.upsert(
+        upsert_writer.upsert(
             {
                 "user_id": 2,
                 "name": "Bob",
@@ -456,7 +456,7 @@ async def main():
         )
         print("Queued user_id=2 (Bob)")
 
-        await upsert_writer.upsert(
+        upsert_writer.upsert(
             {
                 "user_id": 3,
                 "name": "Charlie",
@@ -479,7 +479,7 @@ async def main():
         # the server confirms this specific write, useful when you need to
         # read-after-write or verify critical updates.
         print("\n--- Testing Upsert (per-record acknowledgment) ---")
-        ack = await upsert_writer.upsert(
+        handle = upsert_writer.upsert(
             {
                 "user_id": 1,
                 "name": "Alice Updated",
@@ -494,7 +494,7 @@ async def main():
                 "balance": Decimal("2345.67"),
             }
         )
-        await ack  # wait for server acknowledgment before proceeding
+        await handle.wait()  # wait for server acknowledgment
         print("Updated user_id=1 (Alice -> Alice Updated) — server 
acknowledged")
 
     except Exception as e:
@@ -554,9 +554,8 @@ async def main():
     try:
         upsert_writer = pk_table.new_upsert()
 
-        # Per-record ack for delete — await the handle to confirm deletion
-        ack = await upsert_writer.delete({"user_id": 3})
-        await ack
+        handle = upsert_writer.delete({"user_id": 3})
+        await handle.wait()
         print("Deleted user_id=3 — server acknowledged")
 
         lookuper = pk_table.new_lookup()
@@ -670,12 +669,12 @@ async def main():
         partitioned_writer = await partitioned_table.new_append_writer()
 
         # Append data to US partition
-        await partitioned_writer.append({"id": 1, "region": "US", "value": 
100})
-        await partitioned_writer.append({"id": 2, "region": "US", "value": 
200})
+        partitioned_writer.append({"id": 1, "region": "US", "value": 100})
+        partitioned_writer.append({"id": 2, "region": "US", "value": 200})
         # Append data to EU partition
-        await partitioned_writer.append({"id": 3, "region": "EU", "value": 
300})
-        await partitioned_writer.append({"id": 4, "region": "EU", "value": 
400})
-        partitioned_writer.flush()
+        partitioned_writer.append({"id": 3, "region": "EU", "value": 300})
+        partitioned_writer.append({"id": 4, "region": "EU", "value": 400})
+        await partitioned_writer.flush()
         print("\nWrote 4 records (2 to US, 2 to EU)")
 
         # Demo: list_partition_offsets
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index ae7f6c5..f1f4ee6 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -30,6 +30,7 @@ mod metadata;
 mod table;
 mod upsert;
 mod utils;
+mod write_handle;
 
 pub use admin::*;
 pub use config::*;
@@ -40,6 +41,7 @@ pub use metadata::*;
 pub use table::*;
 pub use upsert::*;
 pub use utils::*;
+pub use write_handle::*;
 
 static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
     tokio::runtime::Builder::new_multi_thread()
@@ -88,6 +90,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<RecordBatch>()?;
     m.add_class::<PartitionInfo>()?;
     m.add_class::<OffsetType>()?;
+    m.add_class::<WriteResultHandle>()?;
 
     // Register constants
     m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index e987d43..8af6b13 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -533,76 +533,43 @@ impl AppendWriter {
         let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
 
         for batch in batch_list {
-            // Drop the ack coroutine — fire-and-forget
-            let _ = self.write_arrow_batch(py, batch)?;
+            // Drop the handle — fire-and-forget for bulk writes
+            drop(self.write_arrow_batch(py, batch)?);
         }
         Ok(())
     }
 
-    /// Write Arrow batch data
+    /// Write Arrow batch data.
     ///
     /// Returns:
-    ///     A coroutine that can be awaited for server acknowledgment,
-    ///     or ignored for fire-and-forget behavior.
-    pub fn write_arrow_batch<'py>(
-        &self,
-        py: Python<'py>,
-        batch: Py<PyAny>,
-    ) -> PyResult<Bound<'py, PyAny>> {
+    ///     WriteResultHandle that can be ignored (fire-and-forget) or
+    ///     awaited via `handle.wait()` for server acknowledgment.
+    pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> 
PyResult<WriteResultHandle> {
         // This shares the underlying Arrow buffers without copying data
         let batch_bound = batch.bind(py);
         let rust_batch: ArrowRecordBatch = 
FromPyArrow::from_pyarrow_bound(batch_bound)
             .map_err(|e| FlussError::new_err(format!("Failed to convert 
RecordBatch: {e}")))?;
 
-        let inner = self.inner.clone();
-
-        future_into_py(py, async move {
-            let result_future = inner
-                .append_arrow_batch(rust_batch)
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
-
-            Python::attach(|py| {
-                future_into_py(py, async move {
-                    result_future
-                        .await
-                        .map_err(|e| FlussError::new_err(e.to_string()))?;
-                    Ok(())
-                })
-                .map(|bound| bound.unbind())
-            })
-        })
+        let result_future = self
+            .inner
+            .append_arrow_batch(rust_batch)
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+        Ok(WriteResultHandle::new(result_future))
     }
 
-    /// Append a single row to the table
+    /// Append a single row to the table.
     ///
     /// Returns:
-    ///     A coroutine that can be awaited for server acknowledgment,
-    ///     or ignored for fire-and-forget behavior.
-    pub fn append<'py>(
-        &self,
-        py: Python<'py>,
-        row: &Bound<'py, PyAny>,
-    ) -> PyResult<Bound<'py, PyAny>> {
+    ///     WriteResultHandle that can be ignored (fire-and-forget) or
+    ///     awaited via `handle.wait()` for server acknowledgment.
+    pub fn append(&self, row: &Bound<'_, PyAny>) -> 
PyResult<WriteResultHandle> {
         let generic_row = python_to_generic_row(row, &self.table_info)?;
-        let inner = self.inner.clone();
-
-        future_into_py(py, async move {
-            let result_future = inner
-                .append(&generic_row)
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-            Python::attach(|py| {
-                future_into_py(py, async move {
-                    result_future
-                        .await
-                        .map_err(|e| FlussError::new_err(e.to_string()))?;
-                    Ok(())
-                })
-                .map(|bound| bound.unbind())
-            })
-        })
+        let result_future = self
+            .inner
+            .append(&generic_row)
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+        Ok(WriteResultHandle::new(result_future))
     }
 
     /// Write Pandas DataFrame data
@@ -636,16 +603,13 @@ impl AppendWriter {
     }
 
     /// Flush any pending data
-    pub fn flush(&self, py: Python) -> PyResult<()> {
+    pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
         let inner = self.inner.clone();
-        // Release the GIL before blocking on I/O
-        py.detach(|| {
-            TOKIO_RUNTIME.block_on(async {
-                inner
-                    .flush()
-                    .await
-                    .map_err(|e| FlussError::new_err(e.to_string()))
-            })
+        future_into_py(py, async move {
+            inner
+                .flush()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))
         })
     }
 
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
index 5c10ded..0aa69d7 100644
--- a/bindings/python/src/upsert.rs
+++ b/bindings/python/src/upsert.rs
@@ -18,26 +18,24 @@
 use crate::table::{python_pk_to_generic_row, python_to_generic_row};
 use crate::*;
 use pyo3_async_runtimes::tokio::future_into_py;
-use std::sync::Arc;
-use tokio::sync::Mutex;
+use std::sync::{Arc, Mutex};
 
 /// Writer for upserting and deleting data in a Fluss primary key table.
 ///
-/// Each upsert/delete operation queues the write and returns a coroutine
-/// that can be awaited for per-record acknowledgment, or ignored for
-/// fire-and-forget semantics (call `flush()` to ensure delivery).
+/// Each upsert/delete operation synchronously queues the write. Call `flush()`
+/// to ensure all queued writes are delivered to the server.
 ///
 /// # Example:
 ///     writer = table.new_upsert()
 ///
-///     # Fire-and-forget with flush
-///     await writer.upsert(row1)
-///     await writer.upsert(row2)
+///     # Fire-and-forget — ignore the returned handle
+///     writer.upsert(row1)
+///     writer.upsert(row2)
 ///     await writer.flush()
 ///
-///     # Or await individual acknowledgment
-///     ack = await writer.upsert(row3)
-///     await ack
+///     # Per-record ack — call wait() on the handle
+///     handle = writer.upsert(critical_row)
+///     await handle.wait()
 #[pyclass]
 pub struct UpsertWriter {
     inner: Arc<UpsertWriterInner>,
@@ -46,7 +44,7 @@ pub struct UpsertWriter {
 struct UpsertWriterInner {
     table_upsert: fcore::client::TableUpsert,
     /// Lazily initialized writer - created on first write operation
-    writer: Mutex<Option<fcore::client::UpsertWriter>>,
+    writer: Mutex<Option<Arc<fcore::client::UpsertWriter>>>,
     table_info: fcore::metadata::TableInfo,
 }
 
@@ -57,100 +55,65 @@ impl UpsertWriter {
     /// If a row with the same primary key exists, it will be updated.
     /// Otherwise, a new row will be inserted.
     ///
+    /// The write is queued synchronously. Call `flush()` to ensure delivery.
+    ///
     /// Args:
     ///     row: A dict, list, or tuple containing the row data.
     ///          For dict: keys are column names, values are column values.
     ///          For list/tuple: values must be in schema order.
-    ///
-    /// Returns:
-    ///     A coroutine that can be awaited for server acknowledgment,
-    ///     or ignored for fire-and-forget behavior.
-    pub fn upsert<'py>(
-        &self,
-        py: Python<'py>,
-        row: &Bound<'_, PyAny>,
-    ) -> PyResult<Bound<'py, PyAny>> {
+    pub fn upsert(&self, row: &Bound<'_, PyAny>) -> 
PyResult<WriteResultHandle> {
         let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
-        let inner = self.inner.clone();
-
-        future_into_py(py, async move {
-            let mut guard = inner.get_or_create_writer().await?;
-            let writer = guard.as_mut().unwrap();
-            let result_future = writer
-                .upsert(&generic_row)
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-            Python::attach(|py| {
-                future_into_py(py, async move {
-                    result_future
-                        .await
-                        .map_err(|e| FlussError::new_err(e.to_string()))?;
-                    Ok(())
-                })
-                .map(|bound| bound.unbind())
-            })
-        })
+        let writer = self.inner.get_or_create_writer()?;
+        let result_future = writer
+            .upsert(&generic_row)
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+        Ok(WriteResultHandle::new(result_future))
     }
 
     /// Delete a row from the table by primary key.
     ///
+    /// The delete is queued synchronously. Call `flush()` to ensure delivery.
+    ///
     /// Args:
     ///     pk: A dict, list, or tuple containing only the primary key values.
     ///         For dict: keys are PK column names.
     ///         For list/tuple: values in PK column order.
-    ///
-    /// Returns:
-    ///     A coroutine that can be awaited for server acknowledgment,
-    ///     or ignored for fire-and-forget behavior.
-    pub fn delete<'py>(
-        &self,
-        py: Python<'py>,
-        pk: &Bound<'_, PyAny>,
-    ) -> PyResult<Bound<'py, PyAny>> {
+    pub fn delete(&self, pk: &Bound<'_, PyAny>) -> PyResult<WriteResultHandle> 
{
         let generic_row = python_pk_to_generic_row(pk, 
&self.inner.table_info)?;
-        let inner = self.inner.clone();
-
-        future_into_py(py, async move {
-            let mut guard = inner.get_or_create_writer().await?;
-            let writer = guard.as_mut().unwrap();
-            let result_future = writer
-                .delete(&generic_row)
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-            Python::attach(|py| {
-                future_into_py(py, async move {
-                    result_future
-                        .await
-                        .map_err(|e| FlussError::new_err(e.to_string()))?;
-                    Ok(())
-                })
-                .map(|bound| bound.unbind())
-            })
-        })
+        let writer = self.inner.get_or_create_writer()?;
+        let result_future = writer
+            .delete(&generic_row)
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+        Ok(WriteResultHandle::new(result_future))
     }
 
     /// Flush all pending upsert/delete operations to the server.
     ///
-    /// This method sends all buffered operations and blocks until they are
+    /// This method sends all buffered operations and waits until they are
     /// acknowledged according to the writer's ack configuration.
     ///
     /// Returns:
     ///     None on success
     pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
-        let inner = self.inner.clone();
+        // Clone the Arc<UpsertWriter> out of the lock so we don't hold the 
guard across await
+        let writer = {
+            let guard = self
+                .inner
+                .writer
+                .lock()
+                .map_err(|e| FlussError::new_err(format!("Lock poisoned: 
{e}")))?;
+            guard.as_ref().cloned()
+        };
 
         future_into_py(py, async move {
-            let writer_guard = inner.writer.lock().await;
-
-            if let Some(writer) = writer_guard.as_ref() {
+            if let Some(writer) = writer {
                 writer
                     .flush()
                     .await
                     .map_err(|e| FlussError::new_err(e.to_string()))
             } else {
-                // Nothing to flush - no writer was created yet
                 Ok(())
             }
         })
@@ -197,17 +160,18 @@ impl UpsertWriter {
 
 impl UpsertWriterInner {
     /// Get the cached writer or create one on first use.
-    async fn get_or_create_writer(
-        &self,
-    ) -> PyResult<tokio::sync::MutexGuard<'_, 
Option<fcore::client::UpsertWriter>>> {
-        let mut guard = self.writer.lock().await;
+    fn get_or_create_writer(&self) -> 
PyResult<Arc<fcore::client::UpsertWriter>> {
+        let mut guard = self
+            .writer
+            .lock()
+            .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))?;
         if guard.is_none() {
             let writer = self
                 .table_upsert
                 .create_writer()
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
-            *guard = Some(writer);
+            *guard = Some(Arc::new(writer));
         }
-        Ok(guard)
+        Ok(guard.as_ref().unwrap().clone())
     }
 }
diff --git a/bindings/python/src/write_handle.rs 
b/bindings/python/src/write_handle.rs
new file mode 100644
index 0000000..4f3ce99
--- /dev/null
+++ b/bindings/python/src/write_handle.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Mutex;
+
+/// Handle for a pending write operation.
+///
+/// Returned by `upsert()`, `delete()`, `append()`, etc.
+/// Can be safely ignored for fire-and-forget semantics,
+/// or awaited via `wait()` for per-record acknowledgment.
+///
+/// # Example:
+///     # Fire-and-forget — just ignore the handle
+///     writer.upsert(row1)
+///     writer.upsert(row2)
+///     await writer.flush()
+///
+///     # Per-record ack — call wait()
+///     handle = writer.upsert(critical_row)
+///     await handle.wait()
+#[pyclass]
+pub struct WriteResultHandle {
+    inner: Mutex<Option<fcore::client::WriteResultFuture>>,
+}
+
+impl WriteResultHandle {
+    pub fn new(future: fcore::client::WriteResultFuture) -> Self {
+        Self {
+            inner: Mutex::new(Some(future)),
+        }
+    }
+}
+
+#[pymethods]
+impl WriteResultHandle {
+    /// Wait for server acknowledgment of this specific write.
+    ///
+    /// Returns:
+    ///     None on success, raises FlussError on failure.
+    pub fn wait<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+        let future = self
+            .inner
+            .lock()
+            .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))?
+            .take()
+            .ok_or_else(|| FlussError::new_err("WriteResultHandle already 
consumed"))?;
+
+        future_into_py(py, async move {
+            future
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+            Ok(())
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        let consumed = self.inner.lock().map(|g| g.is_none()).unwrap_or(false);
+        if consumed {
+            "WriteResultHandle(consumed)".to_string()
+        } else {
+            "WriteResultHandle(pending)".to_string()
+        }
+    }
+}
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 2fcb134..3acf73f 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -62,7 +62,7 @@ pub async fn main() -> Result<()> {
         row.set_field(0, id);
         row.set_field(1, name);
         row.set_field(2, age);
-        upsert_writer.upsert(&row).await?;
+        upsert_writer.upsert(&row)?;
         println!("Upserted: {row:?}");
     }
     upsert_writer.flush().await?;
@@ -85,7 +85,7 @@ pub async fn main() -> Result<()> {
     row.set_field(0, 1);
     row.set_field(1, "Verso");
     row.set_field(2, 33i64);
-    upsert_writer.upsert(&row).await?.await?;
+    upsert_writer.upsert(&row)?.await?;
     println!("Updated: {row:?}");
 
     let result = lookuper.lookup(&make_key(1)).await?;
@@ -100,7 +100,7 @@ pub async fn main() -> Result<()> {
     // For delete, only primary key field needs to be set; other fields can 
remain null
     let mut row = GenericRow::new(3);
     row.set_field(0, 2);
-    upsert_writer.delete(&row).await?.await?;
+    upsert_writer.delete(&row)?.await?;
     println!("Deleted row with id=2");
 
     let result = lookuper.lookup(&make_key(2)).await?;
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index feb8f05..ee1f541 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -74,7 +74,7 @@ pub async fn main() -> Result<()> {
         row.set_field(1, region);
         row.set_field(2, zone);
         row.set_field(3, score);
-        upsert_writer.upsert(&row).await?;
+        upsert_writer.upsert(&row)?;
         println!("Upserted: {row:?}");
     }
     upsert_writer.flush().await?;
@@ -102,7 +102,7 @@ pub async fn main() -> Result<()> {
     row.set_field(1, "APAC");
     row.set_field(2, 1i64);
     row.set_field(3, 4321i64);
-    upsert_writer.upsert(&row).await?.await?;
+    upsert_writer.upsert(&row)?.await?;
     println!("Updated: {row:?}");
 
     let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
@@ -118,7 +118,7 @@ pub async fn main() -> Result<()> {
     row.set_field(0, 1002);
     row.set_field(1, "EMEA");
     row.set_field(2, 2i64);
-    upsert_writer.delete(&row).await?.await?;
+    upsert_writer.delete(&row)?.await?;
     println!("Deleted: {row:?}");
 
     let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index ee9bc7b..199fce2 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -64,12 +64,12 @@ pub async fn main() -> Result<()> {
     let table = conn.get_table(&table_path).await?;
     let append_writer = table.new_append()?.create_writer()?;
     // Fire-and-forget: queue writes then flush
-    append_writer.append(&row).await?;
+    append_writer.append(&row)?;
     let mut row = GenericRow::new(3);
     row.set_field(0, 233333);
     row.set_field(1, "tt44");
     row.set_field(2, 987_654_321_987i64);
-    append_writer.append(&row).await?;
+    append_writer.append(&row)?;
     append_writer.flush().await?;
 
     // scan rows
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index e26b61a..942253f 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -80,7 +80,7 @@ impl AppendWriter {
     /// # Returns
     /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
     /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
-    pub async fn append<R: InternalRow>(&self, row: &R) -> 
Result<WriteResultFuture> {
+    pub fn append<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture> 
{
         let physical_table_path = Arc::new(get_physical_path(
             &self.table_path,
             self.partition_getter.as_ref(),
@@ -92,7 +92,7 @@ impl AppendWriter {
             self.table_info.schema_id,
             row,
         );
-        let result_handle = self.writer_client.send(&record).await?;
+        let result_handle = self.writer_client.send(&record)?;
         Ok(WriteResultFuture::new(result_handle))
     }
 
@@ -107,7 +107,7 @@ impl AppendWriter {
     /// # Returns
     /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
     /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
-    pub async fn append_arrow_batch(&self, batch: RecordBatch) -> 
Result<WriteResultFuture> {
+    pub fn append_arrow_batch(&self, batch: RecordBatch) -> 
Result<WriteResultFuture> {
         let physical_table_path = if self.partition_getter.is_some() && 
batch.num_rows() > 0 {
             let first_row = ColumnarRow::new(Arc::new(batch.clone()));
             Arc::new(get_physical_path(
@@ -125,7 +125,7 @@ impl AppendWriter {
             self.table_info.schema_id,
             batch,
         );
-        let result_handle = self.writer_client.send(&record).await?;
+        let result_handle = self.writer_client.send(&record)?;
         Ok(WriteResultFuture::new(result_handle))
     }
 
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index a1646cc..7057b90 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -354,7 +354,7 @@ impl UpsertWriter {
     /// # Returns
     /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
     /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
-    pub async fn upsert<R: InternalRow>(&self, row: &R) -> 
Result<WriteResultFuture> {
+    pub fn upsert<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture> 
{
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
@@ -379,7 +379,7 @@ impl UpsertWriter {
             Some(row_bytes),
         );
 
-        let result_handle = self.writer_client.send(&write_record).await?;
+        let result_handle = self.writer_client.send(&write_record)?;
         Ok(WriteResultFuture::new(result_handle))
     }
 
@@ -395,7 +395,7 @@ impl UpsertWriter {
     /// # Returns
     /// A [`WriteResultFuture`] that can be awaited to wait for server 
acknowledgment,
     /// or dropped for fire-and-forget behavior (use `flush()` to ensure 
delivery).
-    pub async fn delete<R: InternalRow>(&self, row: &R) -> 
Result<WriteResultFuture> {
+    pub fn delete<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture> 
{
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
@@ -415,7 +415,7 @@ impl UpsertWriter {
             None,
         );
 
-        let result_handle = self.writer_client.send(&write_record).await?;
+        let result_handle = self.writer_client.send(&write_record)?;
         Ok(WriteResultFuture::new(result_handle))
     }
 }
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 5eae868..2c36452 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -25,11 +25,11 @@ use crate::metadata::{PhysicalTablePath, TableBucket};
 use crate::util::current_time_ms;
 use crate::{BucketId, PartitionId, TableId};
 use dashmap::DashMap;
+use parking_lot::Mutex;
 use parking_lot::RwLock;
 use std::collections::{HashMap, HashSet, VecDeque};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
-use tokio::sync::Mutex;
 
 // Type alias to simplify complex nested types
 type BucketBatches = Vec<(BucketId, Arc<Mutex<VecDeque<WriteBatch>>>)>;
@@ -144,7 +144,7 @@ impl RecordAccumulator {
         ))
     }
 
-    pub async fn append(
+    pub fn append(
         &self,
         record: &WriteRecord<'_>,
         bucket_id: BucketId,
@@ -180,7 +180,7 @@ impl RecordAccumulator {
                 .clone()
         };
 
-        let mut dq_guard = dq.lock().await;
+        let mut dq_guard = dq.lock();
         if let Some(append_result) = self.try_append(record, &mut dq_guard)? {
             return Ok(append_result);
         }
@@ -193,7 +193,7 @@ impl RecordAccumulator {
         self.append_new_batch(cluster, record, &mut dq_guard)
     }
 
-    pub async fn ready(&self, cluster: &Arc<Cluster>) -> 
Result<ReadyCheckResult> {
+    pub fn ready(&self, cluster: &Arc<Cluster>) -> Result<ReadyCheckResult> {
         // Snapshot just the Arcs we need, avoiding cloning the entire 
BucketAndWriteBatches struct
         let entries: Vec<(Arc<PhysicalTablePath>, Option<PartitionId>, 
BucketBatches)> = self
             .write_batches
@@ -216,18 +216,16 @@ impl RecordAccumulator {
         let mut unknown_leader_tables = HashSet::new();
 
         for (physical_table_path, mut partition_id, bucket_batches) in entries 
{
-            next_ready_check_delay_ms = self
-                .bucket_ready(
-                    &physical_table_path,
-                    physical_table_path.get_partition_name().is_some(),
-                    &mut partition_id,
-                    bucket_batches,
-                    &mut ready_nodes,
-                    &mut unknown_leader_tables,
-                    cluster,
-                    next_ready_check_delay_ms,
-                )
-                .await?
+            next_ready_check_delay_ms = self.bucket_ready(
+                &physical_table_path,
+                physical_table_path.get_partition_name().is_some(),
+                &mut partition_id,
+                bucket_batches,
+                &mut ready_nodes,
+                &mut unknown_leader_tables,
+                cluster,
+                next_ready_check_delay_ms,
+            )?
         }
 
         Ok(ReadyCheckResult {
@@ -238,7 +236,7 @@ impl RecordAccumulator {
     }
 
     #[allow(clippy::too_many_arguments)]
-    async fn bucket_ready(
+    fn bucket_ready(
         &self,
         physical_table_path: &Arc<PhysicalTablePath>,
         is_partitioned_table: bool,
@@ -274,7 +272,7 @@ impl RecordAccumulator {
         }
 
         for (bucket_id, batch) in bucket_batches {
-            let batch_guard = batch.lock().await;
+            let batch_guard = batch.lock();
             if batch_guard.is_empty() {
                 continue;
             }
@@ -316,7 +314,7 @@ impl RecordAccumulator {
         next_ready_check_delay_ms
     }
 
-    pub async fn drain(
+    pub fn drain(
         &self,
         cluster: Arc<Cluster>,
         nodes: &HashSet<ServerNode>,
@@ -327,9 +325,7 @@ impl RecordAccumulator {
         }
         let mut batches = HashMap::new();
         for node in nodes {
-            let ready = self
-                .drain_batches_for_one_node(&cluster, node, max_size)
-                .await?;
+            let ready = self.drain_batches_for_one_node(&cluster, node, 
max_size)?;
             if !ready.is_empty() {
                 batches.insert(node.id(), ready);
             }
@@ -338,7 +334,7 @@ impl RecordAccumulator {
         Ok(batches)
     }
 
-    async fn drain_batches_for_one_node(
+    fn drain_batches_for_one_node(
         &self,
         cluster: &Cluster,
         node: &ServerNode,
@@ -352,15 +348,13 @@ impl RecordAccumulator {
             return Ok(ready);
         }
 
-        // Get the start index without holding the lock across awaits
         let start = {
-            let mut nodes_drain_index_guard = 
self.nodes_drain_index.lock().await;
+            let mut nodes_drain_index_guard = self.nodes_drain_index.lock();
             let drain_index = 
nodes_drain_index_guard.entry(node.id()).or_insert(0);
             *drain_index % buckets.len()
         };
 
         let mut current_index = start;
-        // Assigned at the start of each loop iteration (line 323), used after 
loop (line 376)
         let mut last_processed_index;
 
         loop {
@@ -383,7 +377,7 @@ impl RecordAccumulator {
             if let Some(deque) = deque {
                 let mut maybe_batch = None;
                 {
-                    let mut batch_lock = deque.lock().await;
+                    let mut batch_lock = deque.lock();
                     if !batch_lock.is_empty() {
                         let first_batch = batch_lock.front().unwrap();
 
@@ -419,7 +413,7 @@ impl RecordAccumulator {
 
         // Store the last processed index to maintain round-robin fairness
         {
-            let mut nodes_drain_index_guard = 
self.nodes_drain_index.lock().await;
+            let mut nodes_drain_index_guard = self.nodes_drain_index.lock();
             nodes_drain_index_guard.insert(node.id(), last_processed_index);
         }
 
@@ -430,7 +424,7 @@ impl RecordAccumulator {
         self.incomplete_batches.write().remove(&batch_id);
     }
 
-    pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
+    pub fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
         ready_write_batch.write_batch.re_enqueued();
         let physical_table_path = 
ready_write_batch.write_batch.physical_table_path();
         let bucket_id = ready_write_batch.table_bucket.bucket_id();
@@ -456,7 +450,7 @@ impl RecordAccumulator {
                 .clone()
         };
 
-        let mut dq_guard = dq.lock().await;
+        let mut dq_guard = dq.lock();
         dq_guard.push_front(ready_write_batch.write_batch);
     }
 
@@ -604,20 +598,18 @@ mod tests {
         };
         let record = WriteRecord::for_append(table_info, physical_table_path, 
1, &row);
 
-        accumulator.append(&record, 0, &cluster, false).await?;
+        accumulator.append(&record, 0, &cluster, false)?;
 
         let server = cluster.get_tablet_server(1).expect("server");
         let nodes = HashSet::from([server.clone()]);
-        let mut batches = accumulator
-            .drain(cluster.clone(), &nodes, 1024 * 1024)
-            .await?;
+        let mut batches = accumulator.drain(cluster.clone(), &nodes, 1024 * 
1024)?;
         let mut drained = batches.remove(&1).expect("drained batches");
         let batch = drained.pop().expect("batch");
         assert_eq!(batch.write_batch.attempts(), 0);
 
-        accumulator.re_enqueue(batch).await;
+        accumulator.re_enqueue(batch);
 
-        let mut batches = accumulator.drain(cluster, &nodes, 1024 * 
1024).await?;
+        let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?;
         let mut drained = batches.remove(&1).expect("drained batches");
         let batch = drained.pop().expect("batch");
         assert_eq!(batch.write_batch.attempts(), 1);
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index 49eff05..2c848d3 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -205,8 +205,8 @@ impl ResultHandle {
 /// A future that represents a pending write operation.
 ///
 /// This type implements [`Future`], allowing users to either:
-/// 1. Await immediately to block on acknowledgment: 
`writer.upsert(&row).await?.await?`
-/// 2. Fire-and-forget with later flush: `writer.upsert(&row).await?; 
writer.flush().await?`
+/// 1. Await immediately to block on acknowledgment: 
`writer.upsert(&row)?.await?`
+/// 2. Fire-and-forget with later flush: `writer.upsert(&row)?; 
writer.flush().await?`
 ///
 /// This pattern is similar to rdkafka's `DeliveryFuture` and allows for 
efficient batching
 /// when users don't need immediate per-record acknowledgment.
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index f336d0c..069f2d2 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -78,7 +78,7 @@ impl Sender {
 
     async fn run_once(&self) -> Result<()> {
         let cluster = self.metadata.get_cluster();
-        let ready_check_result = self.accumulator.ready(&cluster).await?;
+        let ready_check_result = self.accumulator.ready(&cluster)?;
 
         // Update metadata if needed
         if !ready_check_result.unknown_leader_tables.is_empty() {
@@ -124,14 +124,11 @@ impl Sender {
             return Ok(());
         }
 
-        let batches = self
-            .accumulator
-            .drain(
-                cluster.clone(),
-                &ready_check_result.ready_nodes,
-                self.max_request_size,
-            )
-            .await?;
+        let batches = self.accumulator.drain(
+            cluster.clone(),
+            &ready_check_result.ready_nodes,
+            self.max_request_size,
+        )?;
 
         if !batches.is_empty() {
             self.add_to_inflight_batches(&batches);
@@ -233,8 +230,7 @@ impl Sender {
                     self.handle_batches_with_local_error(
                         request_batches,
                         format!("Failed to build write request: {e}"),
-                    )
-                    .await?;
+                    )?;
                     continue;
                 }
             };
@@ -377,9 +373,8 @@ impl Sender {
                         .error_message()
                         .cloned()
                         .unwrap_or_else(|| error.message().to_string());
-                    if let Some(physical_table_path) = self
-                        .handle_write_batch_error(ready_batch, error, message)
-                        .await?
+                    if let Some(physical_table_path) =
+                        self.handle_write_batch_error(ready_batch, error, 
message)?
                     {
                         invalid_metadata_tables
                             
.insert(physical_table_path.get_table_path().clone());
@@ -392,14 +387,11 @@ impl Sender {
 
         for bucket in pending_buckets {
             if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
-                if let Some(physical_table_path) = self
-                    .handle_write_batch_error(
-                        ready_batch,
-                        FlussError::UnknownServerError,
-                        format!("Missing response for table bucket {bucket}"),
-                    )
-                    .await?
-                {
+                if let Some(physical_table_path) = 
self.handle_write_batch_error(
+                    ready_batch,
+                    FlussError::UnknownServerError,
+                    format!("Missing response for table bucket {bucket}"),
+                )? {
                     
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
                     invalid_physical_table_paths.insert(physical_table_path);
                 }
@@ -438,9 +430,8 @@ impl Sender {
         let mut invalid_physical_table_paths: HashSet<Arc<PhysicalTablePath>> 
= HashSet::new();
 
         for batch in batches {
-            if let Some(physical_table_path) = self
-                .handle_write_batch_error(batch, error, message.clone())
-                .await?
+            if let Some(physical_table_path) =
+                self.handle_write_batch_error(batch, error, message.clone())?
             {
                 
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
                 invalid_physical_table_paths.insert(physical_table_path);
@@ -451,7 +442,7 @@ impl Sender {
         Ok(())
     }
 
-    async fn handle_batches_with_local_error(
+    fn handle_batches_with_local_error(
         &self,
         batches: Vec<ReadyWriteBatch>,
         message: String,
@@ -467,7 +458,7 @@ impl Sender {
         Ok(())
     }
 
-    async fn handle_write_batch_error(
+    fn handle_write_batch_error(
         &self,
         ready_write_batch: ReadyWriteBatch,
         error: FlussError,
@@ -480,7 +471,7 @@ impl Sender {
                 physical_table_path.as_ref(),
                 ready_write_batch.table_bucket.bucket_id()
             );
-            self.re_enqueue_batch(ready_write_batch).await;
+            self.re_enqueue_batch(ready_write_batch);
             return 
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
         }
 
@@ -504,9 +495,9 @@ impl Sender {
         
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path))
     }
 
-    async fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) {
+    fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) {
         self.remove_from_inflight_batches(&ready_write_batch);
-        self.accumulator.re_enqueue(ready_write_batch).await;
+        self.accumulator.re_enqueue(ready_write_batch);
     }
 
     fn remove_from_inflight_batches(&self, ready_write_batch: 
&ReadyWriteBatch) {
@@ -656,7 +647,7 @@ mod tests {
     use crate::test_utils::{build_cluster_arc, build_table_info};
     use std::collections::{HashMap, HashSet};
 
-    async fn build_ready_batch(
+    fn build_ready_batch(
         accumulator: &RecordAccumulator,
         cluster: Arc<Cluster>,
         table_path: Arc<TablePath>,
@@ -667,11 +658,11 @@ mod tests {
             values: vec![Datum::Int32(1)],
         };
         let record = WriteRecord::for_append(table_info, physical_table_path, 
1, &row);
-        let result = accumulator.append(&record, 0, &cluster, false).await?;
+        let result = accumulator.append(&record, 0, &cluster, false)?;
         let result_handle = result.result_handle.expect("result handle");
         let server = cluster.get_tablet_server(1).expect("server");
         let nodes = HashSet::from([server.clone()]);
-        let mut batches = accumulator.drain(cluster, &nodes, 1024 * 
1024).await?;
+        let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?;
         let mut drained = batches.remove(&1).expect("drained batches");
         let batch = drained.pop().expect("batch");
         Ok((batch, result_handle))
@@ -686,19 +677,21 @@ mod tests {
         let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024, 
1000, 1, 1);
 
         let (batch, _handle) =
-            build_ready_batch(accumulator.as_ref(), cluster.clone(), 
table_path.clone()).await?;
+            build_ready_batch(accumulator.as_ref(), cluster.clone(), 
table_path.clone())?;
         let mut inflight = HashMap::new();
         inflight.insert(1, vec![batch]);
         sender.add_to_inflight_batches(&inflight);
         let batch = inflight.remove(&1).unwrap().pop().unwrap();
 
-        sender
-            .handle_write_batch_error(batch, FlussError::RequestTimeOut, 
"timeout".to_string())
-            .await?;
+        sender.handle_write_batch_error(
+            batch,
+            FlussError::RequestTimeOut,
+            "timeout".to_string(),
+        )?;
 
         let server = cluster.get_tablet_server(1).expect("server");
         let nodes = HashSet::from([server.clone()]);
-        let mut batches = accumulator.drain(cluster, &nodes, 1024 * 
1024).await?;
+        let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?;
         let mut drained = batches.remove(&1).expect("drained batches");
         let batch = drained.pop().expect("batch");
         assert_eq!(batch.write_batch.attempts(), 1);
@@ -713,15 +706,12 @@ mod tests {
         let accumulator = Arc::new(RecordAccumulator::new(Config::default()));
         let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024, 
1000, 1, 0);
 
-        let (batch, handle) =
-            build_ready_batch(accumulator.as_ref(), cluster.clone(), 
table_path).await?;
-        sender
-            .handle_write_batch_error(
-                batch,
-                FlussError::InvalidTableException,
-                "invalid".to_string(),
-            )
-            .await?;
+        let (batch, handle) = build_ready_batch(accumulator.as_ref(), 
cluster.clone(), table_path)?;
+        sender.handle_write_batch_error(
+            batch,
+            FlussError::InvalidTableException,
+            "invalid".to_string(),
+        )?;
 
         let batch_result = handle.wait().await?;
         assert!(matches!(
@@ -740,7 +730,7 @@ mod tests {
         let accumulator = Arc::new(RecordAccumulator::new(Config::default()));
         let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024, 
1000, 1, 0);
 
-        let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster, 
table_path).await?;
+        let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster, 
table_path)?;
         let request_buckets = vec![batch.table_bucket.clone()];
         let mut records_by_bucket = HashMap::new();
         records_by_bucket.insert(batch.table_bucket.clone(), batch);
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index c386adf..330affa 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -91,7 +91,7 @@ impl WriterClient {
         }
     }
 
-    pub async fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle> 
{
+    pub fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle> {
         let physical_table_path = &record.physical_table_path;
         let cluster = self.metadata.get_cluster();
         let bucket_key = record.bucket_key.as_ref();
@@ -99,19 +99,13 @@ impl WriterClient {
         let (bucket_assigner, bucket_id) =
             self.assign_bucket(&record.table_info, bucket_key, 
physical_table_path)?;
 
-        let mut result = self
-            .accumulate
-            .append(record, bucket_id, &cluster, true)
-            .await?;
+        let mut result = self.accumulate.append(record, bucket_id, &cluster, 
true)?;
 
         if result.abort_record_for_new_batch {
             let prev_bucket_id = bucket_id;
             bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
             let bucket_id = bucket_assigner.assign_bucket(bucket_key, 
&cluster)?;
-            result = self
-                .accumulate
-                .append(record, bucket_id, &cluster, false)
-                .await?;
+            result = self.accumulate.append(record, bucket_id, &cluster, 
false)?;
         }
 
         if result.batch_is_full || result.new_batch_created {
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index 0bfe4a3..ab5f5b6 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -101,10 +101,7 @@ mod kv_table_test {
             row.set_field(0, *id);
             row.set_field(1, *name);
             row.set_field(2, *age);
-            upsert_writer
-                .upsert(&row)
-                .await
-                .expect("Failed to upsert row");
+            upsert_writer.upsert(&row).expect("Failed to upsert row");
         }
         upsert_writer.flush().await.expect("Failed to flush");
 
@@ -138,7 +135,6 @@ mod kv_table_test {
         updated_row.set_field(2, 33i64);
         upsert_writer
             .upsert(&updated_row)
-            .await
             .expect("Failed to upsert updated row")
             .await
             .expect("Failed to wait for upsert acknowledgment");
@@ -168,7 +164,6 @@ mod kv_table_test {
         delete_row.set_field(0, 1);
         upsert_writer
             .delete(&delete_row)
-            .await
             .expect("Failed to delete")
             .await
             .expect("Failed to wait for delete acknowledgment");
@@ -268,7 +263,7 @@ mod kv_table_test {
             row.set_field(0, *region);
             row.set_field(1, *user_id);
             row.set_field(2, *score);
-            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+            upsert_writer.upsert(&row).expect("Failed to upsert");
         }
         upsert_writer.flush().await.expect("Failed to flush");
 
@@ -308,7 +303,6 @@ mod kv_table_test {
         update_row.set_field(2, 500i64);
         upsert_writer
             .upsert(&update_row)
-            .await
             .expect("Failed to update")
             .await
             .expect("Failed to wait for update acknowledgment");
@@ -379,7 +373,6 @@ mod kv_table_test {
         row.set_field(3, 6942i64);
         upsert_writer
             .upsert(&row)
-            .await
             .expect("Failed to upsert initial row")
             .await
             .expect("Failed to wait for upsert acknowledgment");
@@ -421,7 +414,6 @@ mod kv_table_test {
         partial_row.set_field(3, 420i64);
         partial_writer
             .upsert(&partial_row)
-            .await
             .expect("Failed to upsert")
             .await
             .expect("Failed to wait for upsert acknowledgment");
@@ -509,7 +501,7 @@ mod kv_table_test {
             row.set_field(1, *user_id);
             row.set_field(2, *name);
             row.set_field(3, *score);
-            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+            upsert_writer.upsert(&row).expect("Failed to upsert");
         }
         upsert_writer.flush().await.expect("Failed to flush");
 
@@ -546,7 +538,6 @@ mod kv_table_test {
         updated_row.set_field(3, 999i64);
         upsert_writer
             .upsert(&updated_row)
-            .await
             .expect("Failed to upsert updated row")
             .await
             .expect("Failed to wait for upsert acknowledgment");
@@ -585,7 +576,6 @@ mod kv_table_test {
         delete_key.set_field(1, 1);
         upsert_writer
             .delete(&delete_key)
-            .await
             .expect("Failed to delete")
             .await
             .expect("Failed to wait for delete acknowledgment");
@@ -721,7 +711,6 @@ mod kv_table_test {
 
         upsert_writer
             .upsert(&row)
-            .await
             .expect("Failed to upsert row with all datatypes")
             .await
             .expect("Failed to wait for upsert acknowledgment");
@@ -826,7 +815,6 @@ mod kv_table_test {
 
         upsert_writer
             .upsert(&row_with_nulls)
-            .await
             .expect("Failed to upsert row with nulls")
             .await
             .expect("Failed to wait for upsert acknowledgment");
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index ed90fd0..82f8135 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -99,14 +99,12 @@ mod table_test {
             record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2", 
"a3"])).unwrap();
         append_writer
             .append_arrow_batch(batch1)
-            .await
             .expect("Failed to append batch");
 
         let batch2 =
             record_batch!(("c1", Int32, [4, 5, 6]), ("c2", Utf8, ["a4", "a5", 
"a6"])).unwrap();
         append_writer
             .append_arrow_batch(batch2)
-            .await
             .expect("Failed to append batch");
 
         // Flush to ensure all writes are acknowledged
@@ -230,7 +228,6 @@ mod table_test {
         .unwrap();
         append_writer
             .append_arrow_batch(batch)
-            .await
             .expect("Failed to append batch");
 
         // Flush to ensure all writes are acknowledged
@@ -332,7 +329,6 @@ mod table_test {
         .unwrap();
         append_writer
             .append_arrow_batch(batch)
-            .await
             .expect("Failed to append batch");
         append_writer.flush().await.expect("Failed to flush");
 
@@ -485,19 +481,16 @@ mod table_test {
             .append_arrow_batch(
                 record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a", 
"b"])).unwrap(),
             )
-            .await
             .unwrap();
         writer
             .append_arrow_batch(
                 record_batch!(("id", Int32, [3, 4]), ("name", Utf8, ["c", 
"d"])).unwrap(),
             )
-            .await
             .unwrap();
         writer
             .append_arrow_batch(
                 record_batch!(("id", Int32, [5, 6]), ("name", Utf8, ["e", 
"f"])).unwrap(),
             )
-            .await
             .unwrap();
         writer.flush().await.unwrap();
 
@@ -536,7 +529,6 @@ mod table_test {
             .append_arrow_batch(
                 record_batch!(("id", Int32, [7, 8]), ("name", Utf8, ["g", 
"h"])).unwrap(),
             )
-            .await
             .unwrap();
         writer.flush().await.unwrap();
 
@@ -752,7 +744,6 @@ mod table_test {
 
         append_writer
             .append(&row)
-            .await
             .expect("Failed to append row with all datatypes");
 
         // Append a row with null values for all columns
@@ -763,7 +754,6 @@ mod table_test {
 
         append_writer
             .append(&row_with_nulls)
-            .await
             .expect("Failed to append row with nulls");
 
         append_writer.flush().await.expect("Failed to flush");
@@ -1026,10 +1016,7 @@ mod table_test {
             row.set_field(0, *id);
             row.set_field(1, *region);
             row.set_field(2, *value);
-            append_writer
-                .append(&row)
-                .await
-                .expect("Failed to append row");
+            append_writer.append(&row).expect("Failed to append row");
         }
 
         append_writer.flush().await.expect("Failed to flush");
@@ -1044,7 +1031,6 @@ mod table_test {
         .unwrap();
         append_writer
             .append_arrow_batch(us_batch)
-            .await
             .expect("Failed to append US batch");
 
         let eu_batch = record_batch!(
@@ -1055,7 +1041,6 @@ mod table_test {
         .unwrap();
         append_writer
             .append_arrow_batch(eu_batch)
-            .await
             .expect("Failed to append EU batch");
 
         append_writer
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index 0efe388..baac772 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -148,12 +148,11 @@ mod table_remote_scan_test {
             row.set_field(0, i as i32);
             let v = format!("v{}", i);
             row.set_field(1, v.as_str());
-            append_writer
-                .append(&row)
-                .await
-                .expect("Failed to append row");
+            append_writer.append(&row).expect("Failed to append row");
         }
 
+        append_writer.flush().await.expect("Failed to flush");
+
         // Create a log scanner and subscribe to all buckets to read appended 
records
         let num_buckets = table.table_info().get_num_buckets();
         let log_scanner = table

Reply via email to