This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 26edb7a chore: remove async from queueing while writing (#271)
26edb7a is described below
commit 26edb7ac2c956ee9b19def250aa876a7382e8ace
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 7 14:16:30 2026 +0000
chore: remove async from queueing while writing (#271)
---
README.md | 3 +-
bindings/cpp/src/lib.rs | 7 +-
bindings/python/example/example.py | 33 +++---
bindings/python/src/lib.rs | 3 +
bindings/python/src/table.rs | 88 +++++----------
bindings/python/src/upsert.rs | 124 ++++++++-------------
bindings/python/src/write_handle.rs | 80 +++++++++++++
crates/examples/src/example_kv_table.rs | 6 +-
.../examples/src/example_partitioned_kv_table.rs | 6 +-
crates/examples/src/example_table.rs | 4 +-
crates/fluss/src/client/table/append.rs | 8 +-
crates/fluss/src/client/table/upsert.rs | 8 +-
crates/fluss/src/client/write/accumulator.rs | 64 +++++------
crates/fluss/src/client/write/mod.rs | 4 +-
crates/fluss/src/client/write/sender.rs | 86 +++++++-------
crates/fluss/src/client/write/writer_client.rs | 12 +-
crates/fluss/tests/integration/kv_table.rs | 18 +--
crates/fluss/tests/integration/log_table.rs | 17 +--
.../fluss/tests/integration/table_remote_scan.rs | 7 +-
19 files changed, 269 insertions(+), 309 deletions(-)
diff --git a/README.md b/README.md
index ee9478c..5e771d9 100644
--- a/README.md
+++ b/README.md
@@ -101,7 +101,8 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await;
let append_writer = table.new_append().create_writer();
let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8,
["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap();
- append_writer.append(batch).await?;
+ append_writer.append(batch)?;
+ append_writer.flush().await?;
println!("Start to scan log records......");
// 4: scan the records
let log_scanner = table.new_scan().create_log_scanner();
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 7944c10..4957c99 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -761,8 +761,9 @@ impl AppendWriter {
fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>,
String> {
let generic_row = types::ffi_row_to_core(row);
- let result_future = RUNTIME
- .block_on(async { self.inner.append(&generic_row).await })
+ let result_future = self
+ .inner
+ .append(&generic_row)
.map_err(|e| format!("Failed to append: {e}"))?;
Ok(Box::new(WriteResult {
@@ -789,7 +790,7 @@ impl WriteResult {
Err(e) => err_result(1, e.to_string()),
}
} else {
- ok_result()
+ err_result(1, "WriteResult already consumed".to_string())
}
}
}
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 1cabaa5..d56879a 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -184,7 +184,7 @@ async def main():
# Test 3: Append single rows with Date, Time, Timestamp, Decimal
print("\n--- Testing single row append with temporal/decimal types
---")
# Dict input with all types including Date, Time, Timestamp, Decimal
- await append_writer.append(
+ append_writer.append(
{
"id": 8,
"name": "Helen",
@@ -200,7 +200,7 @@ async def main():
print("Successfully appended row (dict with Date, Time, Timestamp,
Decimal)")
# List input with all types
- await append_writer.append(
+ append_writer.append(
[
9,
"Ivan",
@@ -242,7 +242,7 @@ async def main():
# Flush all pending data
print("\n--- Flushing data ---")
- append_writer.flush()
+ await append_writer.flush()
print("Successfully flushed data")
# Demo: Check offsets after writes
@@ -422,9 +422,9 @@ async def main():
upsert_writer = pk_table.new_upsert()
print(f"Created upsert writer: {upsert_writer}")
- # Fire-and-forget: queue writes without waiting for individual acks.
+ # Fire-and-forget: queue writes synchronously, flush at end.
# Records are batched internally for efficiency.
- await upsert_writer.upsert(
+ upsert_writer.upsert(
{
"user_id": 1,
"name": "Alice",
@@ -441,7 +441,7 @@ async def main():
)
print("Queued user_id=1 (Alice)")
- await upsert_writer.upsert(
+ upsert_writer.upsert(
{
"user_id": 2,
"name": "Bob",
@@ -456,7 +456,7 @@ async def main():
)
print("Queued user_id=2 (Bob)")
- await upsert_writer.upsert(
+ upsert_writer.upsert(
{
"user_id": 3,
"name": "Charlie",
@@ -479,7 +479,7 @@ async def main():
# the server confirms this specific write, useful when you need to
# read-after-write or verify critical updates.
print("\n--- Testing Upsert (per-record acknowledgment) ---")
- ack = await upsert_writer.upsert(
+ handle = upsert_writer.upsert(
{
"user_id": 1,
"name": "Alice Updated",
@@ -494,7 +494,7 @@ async def main():
"balance": Decimal("2345.67"),
}
)
- await ack # wait for server acknowledgment before proceeding
+ await handle.wait() # wait for server acknowledgment
print("Updated user_id=1 (Alice -> Alice Updated) — server
acknowledged")
except Exception as e:
@@ -554,9 +554,8 @@ async def main():
try:
upsert_writer = pk_table.new_upsert()
- # Per-record ack for delete — await the handle to confirm deletion
- ack = await upsert_writer.delete({"user_id": 3})
- await ack
+ handle = upsert_writer.delete({"user_id": 3})
+ await handle.wait()
print("Deleted user_id=3 — server acknowledged")
lookuper = pk_table.new_lookup()
@@ -670,12 +669,12 @@ async def main():
partitioned_writer = await partitioned_table.new_append_writer()
# Append data to US partition
- await partitioned_writer.append({"id": 1, "region": "US", "value":
100})
- await partitioned_writer.append({"id": 2, "region": "US", "value":
200})
+ partitioned_writer.append({"id": 1, "region": "US", "value": 100})
+ partitioned_writer.append({"id": 2, "region": "US", "value": 200})
# Append data to EU partition
- await partitioned_writer.append({"id": 3, "region": "EU", "value":
300})
- await partitioned_writer.append({"id": 4, "region": "EU", "value":
400})
- partitioned_writer.flush()
+ partitioned_writer.append({"id": 3, "region": "EU", "value": 300})
+ partitioned_writer.append({"id": 4, "region": "EU", "value": 400})
+ await partitioned_writer.flush()
print("\nWrote 4 records (2 to US, 2 to EU)")
# Demo: list_partition_offsets
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index ae7f6c5..f1f4ee6 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -30,6 +30,7 @@ mod metadata;
mod table;
mod upsert;
mod utils;
+mod write_handle;
pub use admin::*;
pub use config::*;
@@ -40,6 +41,7 @@ pub use metadata::*;
pub use table::*;
pub use upsert::*;
pub use utils::*;
+pub use write_handle::*;
static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
@@ -88,6 +90,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<RecordBatch>()?;
m.add_class::<PartitionInfo>()?;
m.add_class::<OffsetType>()?;
+ m.add_class::<WriteResultHandle>()?;
// Register constants
m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index e987d43..8af6b13 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -533,76 +533,43 @@ impl AppendWriter {
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
for batch in batch_list {
- // Drop the ack coroutine — fire-and-forget
- let _ = self.write_arrow_batch(py, batch)?;
+ // Drop the handle — fire-and-forget for bulk writes
+ drop(self.write_arrow_batch(py, batch)?);
}
Ok(())
}
- /// Write Arrow batch data
+ /// Write Arrow batch data.
///
/// Returns:
- /// A coroutine that can be awaited for server acknowledgment,
- /// or ignored for fire-and-forget behavior.
- pub fn write_arrow_batch<'py>(
- &self,
- py: Python<'py>,
- batch: Py<PyAny>,
- ) -> PyResult<Bound<'py, PyAny>> {
+ /// WriteResultHandle that can be ignored (fire-and-forget) or
+ /// awaited via `handle.wait()` for server acknowledgment.
+ pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) ->
PyResult<WriteResultHandle> {
// This shares the underlying Arrow buffers without copying data
let batch_bound = batch.bind(py);
let rust_batch: ArrowRecordBatch =
FromPyArrow::from_pyarrow_bound(batch_bound)
.map_err(|e| FlussError::new_err(format!("Failed to convert
RecordBatch: {e}")))?;
- let inner = self.inner.clone();
-
- future_into_py(py, async move {
- let result_future = inner
- .append_arrow_batch(rust_batch)
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
-
- Python::attach(|py| {
- future_into_py(py, async move {
- result_future
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Ok(())
- })
- .map(|bound| bound.unbind())
- })
- })
+ let result_future = self
+ .inner
+ .append_arrow_batch(rust_batch)
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(WriteResultHandle::new(result_future))
}
- /// Append a single row to the table
+ /// Append a single row to the table.
///
/// Returns:
- /// A coroutine that can be awaited for server acknowledgment,
- /// or ignored for fire-and-forget behavior.
- pub fn append<'py>(
- &self,
- py: Python<'py>,
- row: &Bound<'py, PyAny>,
- ) -> PyResult<Bound<'py, PyAny>> {
+ /// WriteResultHandle that can be ignored (fire-and-forget) or
+ /// awaited via `handle.wait()` for server acknowledgment.
+ pub fn append(&self, row: &Bound<'_, PyAny>) ->
PyResult<WriteResultHandle> {
let generic_row = python_to_generic_row(row, &self.table_info)?;
- let inner = self.inner.clone();
-
- future_into_py(py, async move {
- let result_future = inner
- .append(&generic_row)
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Python::attach(|py| {
- future_into_py(py, async move {
- result_future
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Ok(())
- })
- .map(|bound| bound.unbind())
- })
- })
+ let result_future = self
+ .inner
+ .append(&generic_row)
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(WriteResultHandle::new(result_future))
}
/// Write Pandas DataFrame data
@@ -636,16 +603,13 @@ impl AppendWriter {
}
/// Flush any pending data
- pub fn flush(&self, py: Python) -> PyResult<()> {
+ pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
- // Release the GIL before blocking on I/O
- py.detach(|| {
- TOKIO_RUNTIME.block_on(async {
- inner
- .flush()
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))
- })
+ future_into_py(py, async move {
+ inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
})
}
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
index 5c10ded..0aa69d7 100644
--- a/bindings/python/src/upsert.rs
+++ b/bindings/python/src/upsert.rs
@@ -18,26 +18,24 @@
use crate::table::{python_pk_to_generic_row, python_to_generic_row};
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
-use std::sync::Arc;
-use tokio::sync::Mutex;
+use std::sync::{Arc, Mutex};
/// Writer for upserting and deleting data in a Fluss primary key table.
///
-/// Each upsert/delete operation queues the write and returns a coroutine
-/// that can be awaited for per-record acknowledgment, or ignored for
-/// fire-and-forget semantics (call `flush()` to ensure delivery).
+/// Each upsert/delete operation synchronously queues the write. Call `flush()`
+/// to ensure all queued writes are delivered to the server.
///
/// # Example:
/// writer = table.new_upsert()
///
-/// # Fire-and-forget with flush
-/// await writer.upsert(row1)
-/// await writer.upsert(row2)
+/// # Fire-and-forget — ignore the returned handle
+/// writer.upsert(row1)
+/// writer.upsert(row2)
/// await writer.flush()
///
-/// # Or await individual acknowledgment
-/// ack = await writer.upsert(row3)
-/// await ack
+/// # Per-record ack — call wait() on the handle
+/// handle = writer.upsert(critical_row)
+/// await handle.wait()
#[pyclass]
pub struct UpsertWriter {
inner: Arc<UpsertWriterInner>,
@@ -46,7 +44,7 @@ pub struct UpsertWriter {
struct UpsertWriterInner {
table_upsert: fcore::client::TableUpsert,
/// Lazily initialized writer - created on first write operation
- writer: Mutex<Option<fcore::client::UpsertWriter>>,
+ writer: Mutex<Option<Arc<fcore::client::UpsertWriter>>>,
table_info: fcore::metadata::TableInfo,
}
@@ -57,100 +55,65 @@ impl UpsertWriter {
/// If a row with the same primary key exists, it will be updated.
/// Otherwise, a new row will be inserted.
///
+ /// The write is queued synchronously. Call `flush()` to ensure delivery.
+ ///
/// Args:
/// row: A dict, list, or tuple containing the row data.
/// For dict: keys are column names, values are column values.
/// For list/tuple: values must be in schema order.
- ///
- /// Returns:
- /// A coroutine that can be awaited for server acknowledgment,
- /// or ignored for fire-and-forget behavior.
- pub fn upsert<'py>(
- &self,
- py: Python<'py>,
- row: &Bound<'_, PyAny>,
- ) -> PyResult<Bound<'py, PyAny>> {
+ pub fn upsert(&self, row: &Bound<'_, PyAny>) ->
PyResult<WriteResultHandle> {
let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
- let inner = self.inner.clone();
-
- future_into_py(py, async move {
- let mut guard = inner.get_or_create_writer().await?;
- let writer = guard.as_mut().unwrap();
- let result_future = writer
- .upsert(&generic_row)
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Python::attach(|py| {
- future_into_py(py, async move {
- result_future
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Ok(())
- })
- .map(|bound| bound.unbind())
- })
- })
+ let writer = self.inner.get_or_create_writer()?;
+ let result_future = writer
+ .upsert(&generic_row)
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(WriteResultHandle::new(result_future))
}
/// Delete a row from the table by primary key.
///
+ /// The delete is queued synchronously. Call `flush()` to ensure delivery.
+ ///
/// Args:
/// pk: A dict, list, or tuple containing only the primary key values.
/// For dict: keys are PK column names.
/// For list/tuple: values in PK column order.
- ///
- /// Returns:
- /// A coroutine that can be awaited for server acknowledgment,
- /// or ignored for fire-and-forget behavior.
- pub fn delete<'py>(
- &self,
- py: Python<'py>,
- pk: &Bound<'_, PyAny>,
- ) -> PyResult<Bound<'py, PyAny>> {
+ pub fn delete(&self, pk: &Bound<'_, PyAny>) -> PyResult<WriteResultHandle>
{
let generic_row = python_pk_to_generic_row(pk,
&self.inner.table_info)?;
- let inner = self.inner.clone();
-
- future_into_py(py, async move {
- let mut guard = inner.get_or_create_writer().await?;
- let writer = guard.as_mut().unwrap();
- let result_future = writer
- .delete(&generic_row)
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Python::attach(|py| {
- future_into_py(py, async move {
- result_future
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))?;
- Ok(())
- })
- .map(|bound| bound.unbind())
- })
- })
+ let writer = self.inner.get_or_create_writer()?;
+ let result_future = writer
+ .delete(&generic_row)
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(WriteResultHandle::new(result_future))
}
/// Flush all pending upsert/delete operations to the server.
///
- /// This method sends all buffered operations and blocks until they are
+ /// This method sends all buffered operations and waits until they are
/// acknowledged according to the writer's ack configuration.
///
/// Returns:
/// None on success
pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
- let inner = self.inner.clone();
+ // Clone the Arc<UpsertWriter> out of the lock so we don't hold the
guard across await
+ let writer = {
+ let guard = self
+ .inner
+ .writer
+ .lock()
+ .map_err(|e| FlussError::new_err(format!("Lock poisoned:
{e}")))?;
+ guard.as_ref().cloned()
+ };
future_into_py(py, async move {
- let writer_guard = inner.writer.lock().await;
-
- if let Some(writer) = writer_guard.as_ref() {
+ if let Some(writer) = writer {
writer
.flush()
.await
.map_err(|e| FlussError::new_err(e.to_string()))
} else {
- // Nothing to flush - no writer was created yet
Ok(())
}
})
@@ -197,17 +160,18 @@ impl UpsertWriter {
impl UpsertWriterInner {
/// Get the cached writer or create one on first use.
- async fn get_or_create_writer(
- &self,
- ) -> PyResult<tokio::sync::MutexGuard<'_,
Option<fcore::client::UpsertWriter>>> {
- let mut guard = self.writer.lock().await;
+ fn get_or_create_writer(&self) ->
PyResult<Arc<fcore::client::UpsertWriter>> {
+ let mut guard = self
+ .writer
+ .lock()
+ .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))?;
if guard.is_none() {
let writer = self
.table_upsert
.create_writer()
.map_err(|e| FlussError::new_err(e.to_string()))?;
- *guard = Some(writer);
+ *guard = Some(Arc::new(writer));
}
- Ok(guard)
+ Ok(guard.as_ref().unwrap().clone())
}
}
diff --git a/bindings/python/src/write_handle.rs
b/bindings/python/src/write_handle.rs
new file mode 100644
index 0000000..4f3ce99
--- /dev/null
+++ b/bindings/python/src/write_handle.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Mutex;
+
+/// Handle for a pending write operation.
+///
+/// Returned by `upsert()`, `delete()`, `append()`, etc.
+/// Can be safely ignored for fire-and-forget semantics,
+/// or awaited via `wait()` for per-record acknowledgment.
+///
+/// # Example:
+/// # Fire-and-forget — just ignore the handle
+/// writer.upsert(row1)
+/// writer.upsert(row2)
+/// await writer.flush()
+///
+/// # Per-record ack — call wait()
+/// handle = writer.upsert(critical_row)
+/// await handle.wait()
+#[pyclass]
+pub struct WriteResultHandle {
+ inner: Mutex<Option<fcore::client::WriteResultFuture>>,
+}
+
+impl WriteResultHandle {
+ pub fn new(future: fcore::client::WriteResultFuture) -> Self {
+ Self {
+ inner: Mutex::new(Some(future)),
+ }
+ }
+}
+
+#[pymethods]
+impl WriteResultHandle {
+ /// Wait for server acknowledgment of this specific write.
+ ///
+ /// Returns:
+ /// None on success, raises FlussError on failure.
+ pub fn wait<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+ let future = self
+ .inner
+ .lock()
+ .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))?
+ .take()
+ .ok_or_else(|| FlussError::new_err("WriteResultHandle already
consumed"))?;
+
+ future_into_py(py, async move {
+ future
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ let consumed = self.inner.lock().map(|g| g.is_none()).unwrap_or(false);
+ if consumed {
+ "WriteResultHandle(consumed)".to_string()
+ } else {
+ "WriteResultHandle(pending)".to_string()
+ }
+ }
+}
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 2fcb134..3acf73f 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -62,7 +62,7 @@ pub async fn main() -> Result<()> {
row.set_field(0, id);
row.set_field(1, name);
row.set_field(2, age);
- upsert_writer.upsert(&row).await?;
+ upsert_writer.upsert(&row)?;
println!("Upserted: {row:?}");
}
upsert_writer.flush().await?;
@@ -85,7 +85,7 @@ pub async fn main() -> Result<()> {
row.set_field(0, 1);
row.set_field(1, "Verso");
row.set_field(2, 33i64);
- upsert_writer.upsert(&row).await?.await?;
+ upsert_writer.upsert(&row)?.await?;
println!("Updated: {row:?}");
let result = lookuper.lookup(&make_key(1)).await?;
@@ -100,7 +100,7 @@ pub async fn main() -> Result<()> {
// For delete, only primary key field needs to be set; other fields can
remain null
let mut row = GenericRow::new(3);
row.set_field(0, 2);
- upsert_writer.delete(&row).await?.await?;
+ upsert_writer.delete(&row)?.await?;
println!("Deleted row with id=2");
let result = lookuper.lookup(&make_key(2)).await?;
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index feb8f05..ee1f541 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -74,7 +74,7 @@ pub async fn main() -> Result<()> {
row.set_field(1, region);
row.set_field(2, zone);
row.set_field(3, score);
- upsert_writer.upsert(&row).await?;
+ upsert_writer.upsert(&row)?;
println!("Upserted: {row:?}");
}
upsert_writer.flush().await?;
@@ -102,7 +102,7 @@ pub async fn main() -> Result<()> {
row.set_field(1, "APAC");
row.set_field(2, 1i64);
row.set_field(3, 4321i64);
- upsert_writer.upsert(&row).await?.await?;
+ upsert_writer.upsert(&row)?.await?;
println!("Updated: {row:?}");
let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
@@ -118,7 +118,7 @@ pub async fn main() -> Result<()> {
row.set_field(0, 1002);
row.set_field(1, "EMEA");
row.set_field(2, 2i64);
- upsert_writer.delete(&row).await?.await?;
+ upsert_writer.delete(&row)?.await?;
println!("Deleted: {row:?}");
let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index ee9bc7b..199fce2 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -64,12 +64,12 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer()?;
// Fire-and-forget: queue writes then flush
- append_writer.append(&row).await?;
+ append_writer.append(&row)?;
let mut row = GenericRow::new(3);
row.set_field(0, 233333);
row.set_field(1, "tt44");
row.set_field(2, 987_654_321_987i64);
- append_writer.append(&row).await?;
+ append_writer.append(&row)?;
append_writer.flush().await?;
// scan rows
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index e26b61a..942253f 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -80,7 +80,7 @@ impl AppendWriter {
/// # Returns
/// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
/// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
- pub async fn append<R: InternalRow>(&self, row: &R) ->
Result<WriteResultFuture> {
+ pub fn append<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture>
{
let physical_table_path = Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
@@ -92,7 +92,7 @@ impl AppendWriter {
self.table_info.schema_id,
row,
);
- let result_handle = self.writer_client.send(&record).await?;
+ let result_handle = self.writer_client.send(&record)?;
Ok(WriteResultFuture::new(result_handle))
}
@@ -107,7 +107,7 @@ impl AppendWriter {
/// # Returns
/// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
/// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
- pub async fn append_arrow_batch(&self, batch: RecordBatch) ->
Result<WriteResultFuture> {
+ pub fn append_arrow_batch(&self, batch: RecordBatch) ->
Result<WriteResultFuture> {
let physical_table_path = if self.partition_getter.is_some() &&
batch.num_rows() > 0 {
let first_row = ColumnarRow::new(Arc::new(batch.clone()));
Arc::new(get_physical_path(
@@ -125,7 +125,7 @@ impl AppendWriter {
self.table_info.schema_id,
batch,
);
- let result_handle = self.writer_client.send(&record).await?;
+ let result_handle = self.writer_client.send(&record)?;
Ok(WriteResultFuture::new(result_handle))
}
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index a1646cc..7057b90 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -354,7 +354,7 @@ impl UpsertWriter {
/// # Returns
/// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
/// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
- pub async fn upsert<R: InternalRow>(&self, row: &R) ->
Result<WriteResultFuture> {
+ pub fn upsert<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture>
{
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
@@ -379,7 +379,7 @@ impl UpsertWriter {
Some(row_bytes),
);
- let result_handle = self.writer_client.send(&write_record).await?;
+ let result_handle = self.writer_client.send(&write_record)?;
Ok(WriteResultFuture::new(result_handle))
}
@@ -395,7 +395,7 @@ impl UpsertWriter {
/// # Returns
/// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
/// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
- pub async fn delete<R: InternalRow>(&self, row: &R) ->
Result<WriteResultFuture> {
+ pub fn delete<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture>
{
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
@@ -415,7 +415,7 @@ impl UpsertWriter {
None,
);
- let result_handle = self.writer_client.send(&write_record).await?;
+ let result_handle = self.writer_client.send(&write_record)?;
Ok(WriteResultFuture::new(result_handle))
}
}
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 5eae868..2c36452 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -25,11 +25,11 @@ use crate::metadata::{PhysicalTablePath, TableBucket};
use crate::util::current_time_ms;
use crate::{BucketId, PartitionId, TableId};
use dashmap::DashMap;
+use parking_lot::Mutex;
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
-use tokio::sync::Mutex;
// Type alias to simplify complex nested types
type BucketBatches = Vec<(BucketId, Arc<Mutex<VecDeque<WriteBatch>>>)>;
@@ -144,7 +144,7 @@ impl RecordAccumulator {
))
}
- pub async fn append(
+ pub fn append(
&self,
record: &WriteRecord<'_>,
bucket_id: BucketId,
@@ -180,7 +180,7 @@ impl RecordAccumulator {
.clone()
};
- let mut dq_guard = dq.lock().await;
+ let mut dq_guard = dq.lock();
if let Some(append_result) = self.try_append(record, &mut dq_guard)? {
return Ok(append_result);
}
@@ -193,7 +193,7 @@ impl RecordAccumulator {
self.append_new_batch(cluster, record, &mut dq_guard)
}
- pub async fn ready(&self, cluster: &Arc<Cluster>) ->
Result<ReadyCheckResult> {
+ pub fn ready(&self, cluster: &Arc<Cluster>) -> Result<ReadyCheckResult> {
// Snapshot just the Arcs we need, avoiding cloning the entire
BucketAndWriteBatches struct
let entries: Vec<(Arc<PhysicalTablePath>, Option<PartitionId>,
BucketBatches)> = self
.write_batches
@@ -216,18 +216,16 @@ impl RecordAccumulator {
let mut unknown_leader_tables = HashSet::new();
for (physical_table_path, mut partition_id, bucket_batches) in entries
{
- next_ready_check_delay_ms = self
- .bucket_ready(
- &physical_table_path,
- physical_table_path.get_partition_name().is_some(),
- &mut partition_id,
- bucket_batches,
- &mut ready_nodes,
- &mut unknown_leader_tables,
- cluster,
- next_ready_check_delay_ms,
- )
- .await?
+ next_ready_check_delay_ms = self.bucket_ready(
+ &physical_table_path,
+ physical_table_path.get_partition_name().is_some(),
+ &mut partition_id,
+ bucket_batches,
+ &mut ready_nodes,
+ &mut unknown_leader_tables,
+ cluster,
+ next_ready_check_delay_ms,
+ )?
}
Ok(ReadyCheckResult {
@@ -238,7 +236,7 @@ impl RecordAccumulator {
}
#[allow(clippy::too_many_arguments)]
- async fn bucket_ready(
+ fn bucket_ready(
&self,
physical_table_path: &Arc<PhysicalTablePath>,
is_partitioned_table: bool,
@@ -274,7 +272,7 @@ impl RecordAccumulator {
}
for (bucket_id, batch) in bucket_batches {
- let batch_guard = batch.lock().await;
+ let batch_guard = batch.lock();
if batch_guard.is_empty() {
continue;
}
@@ -316,7 +314,7 @@ impl RecordAccumulator {
next_ready_check_delay_ms
}
- pub async fn drain(
+ pub fn drain(
&self,
cluster: Arc<Cluster>,
nodes: &HashSet<ServerNode>,
@@ -327,9 +325,7 @@ impl RecordAccumulator {
}
let mut batches = HashMap::new();
for node in nodes {
- let ready = self
- .drain_batches_for_one_node(&cluster, node, max_size)
- .await?;
+ let ready = self.drain_batches_for_one_node(&cluster, node,
max_size)?;
if !ready.is_empty() {
batches.insert(node.id(), ready);
}
@@ -338,7 +334,7 @@ impl RecordAccumulator {
Ok(batches)
}
- async fn drain_batches_for_one_node(
+ fn drain_batches_for_one_node(
&self,
cluster: &Cluster,
node: &ServerNode,
@@ -352,15 +348,13 @@ impl RecordAccumulator {
return Ok(ready);
}
- // Get the start index without holding the lock across awaits
let start = {
- let mut nodes_drain_index_guard =
self.nodes_drain_index.lock().await;
+ let mut nodes_drain_index_guard = self.nodes_drain_index.lock();
let drain_index =
nodes_drain_index_guard.entry(node.id()).or_insert(0);
*drain_index % buckets.len()
};
let mut current_index = start;
- // Assigned at the start of each loop iteration (line 323), used after
loop (line 376)
let mut last_processed_index;
loop {
@@ -383,7 +377,7 @@ impl RecordAccumulator {
if let Some(deque) = deque {
let mut maybe_batch = None;
{
- let mut batch_lock = deque.lock().await;
+ let mut batch_lock = deque.lock();
if !batch_lock.is_empty() {
let first_batch = batch_lock.front().unwrap();
@@ -419,7 +413,7 @@ impl RecordAccumulator {
// Store the last processed index to maintain round-robin fairness
{
- let mut nodes_drain_index_guard =
self.nodes_drain_index.lock().await;
+ let mut nodes_drain_index_guard = self.nodes_drain_index.lock();
nodes_drain_index_guard.insert(node.id(), last_processed_index);
}
@@ -430,7 +424,7 @@ impl RecordAccumulator {
self.incomplete_batches.write().remove(&batch_id);
}
- pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
+ pub fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
ready_write_batch.write_batch.re_enqueued();
let physical_table_path =
ready_write_batch.write_batch.physical_table_path();
let bucket_id = ready_write_batch.table_bucket.bucket_id();
@@ -456,7 +450,7 @@ impl RecordAccumulator {
.clone()
};
- let mut dq_guard = dq.lock().await;
+ let mut dq_guard = dq.lock();
dq_guard.push_front(ready_write_batch.write_batch);
}
@@ -604,20 +598,18 @@ mod tests {
};
let record = WriteRecord::for_append(table_info, physical_table_path,
1, &row);
- accumulator.append(&record, 0, &cluster, false).await?;
+ accumulator.append(&record, 0, &cluster, false)?;
let server = cluster.get_tablet_server(1).expect("server");
let nodes = HashSet::from([server.clone()]);
- let mut batches = accumulator
- .drain(cluster.clone(), &nodes, 1024 * 1024)
- .await?;
+ let mut batches = accumulator.drain(cluster.clone(), &nodes, 1024 *
1024)?;
let mut drained = batches.remove(&1).expect("drained batches");
let batch = drained.pop().expect("batch");
assert_eq!(batch.write_batch.attempts(), 0);
- accumulator.re_enqueue(batch).await;
+ accumulator.re_enqueue(batch);
- let mut batches = accumulator.drain(cluster, &nodes, 1024 *
1024).await?;
+ let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?;
let mut drained = batches.remove(&1).expect("drained batches");
let batch = drained.pop().expect("batch");
assert_eq!(batch.write_batch.attempts(), 1);
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index 49eff05..2c848d3 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -205,8 +205,8 @@ impl ResultHandle {
/// A future that represents a pending write operation.
///
/// This type implements [`Future`], allowing users to either:
-/// 1. Await immediately to block on acknowledgment:
`writer.upsert(&row).await?.await?`
-/// 2. Fire-and-forget with later flush: `writer.upsert(&row).await?;
writer.flush().await?`
+/// 1. Await immediately to block on acknowledgment:
`writer.upsert(&row)?.await?`
+/// 2. Fire-and-forget with later flush: `writer.upsert(&row)?;
writer.flush().await?`
///
/// This pattern is similar to rdkafka's `DeliveryFuture` and allows for
efficient batching
/// when users don't need immediate per-record acknowledgment.
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index f336d0c..069f2d2 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -78,7 +78,7 @@ impl Sender {
async fn run_once(&self) -> Result<()> {
let cluster = self.metadata.get_cluster();
- let ready_check_result = self.accumulator.ready(&cluster).await?;
+ let ready_check_result = self.accumulator.ready(&cluster)?;
// Update metadata if needed
if !ready_check_result.unknown_leader_tables.is_empty() {
@@ -124,14 +124,11 @@ impl Sender {
return Ok(());
}
- let batches = self
- .accumulator
- .drain(
- cluster.clone(),
- &ready_check_result.ready_nodes,
- self.max_request_size,
- )
- .await?;
+ let batches = self.accumulator.drain(
+ cluster.clone(),
+ &ready_check_result.ready_nodes,
+ self.max_request_size,
+ )?;
if !batches.is_empty() {
self.add_to_inflight_batches(&batches);
@@ -233,8 +230,7 @@ impl Sender {
self.handle_batches_with_local_error(
request_batches,
format!("Failed to build write request: {e}"),
- )
- .await?;
+ )?;
continue;
}
};
@@ -377,9 +373,8 @@ impl Sender {
.error_message()
.cloned()
.unwrap_or_else(|| error.message().to_string());
- if let Some(physical_table_path) = self
- .handle_write_batch_error(ready_batch, error, message)
- .await?
+ if let Some(physical_table_path) =
+ self.handle_write_batch_error(ready_batch, error,
message)?
{
invalid_metadata_tables
.insert(physical_table_path.get_table_path().clone());
@@ -392,14 +387,11 @@ impl Sender {
for bucket in pending_buckets {
if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
- if let Some(physical_table_path) = self
- .handle_write_batch_error(
- ready_batch,
- FlussError::UnknownServerError,
- format!("Missing response for table bucket {bucket}"),
- )
- .await?
- {
+ if let Some(physical_table_path) =
self.handle_write_batch_error(
+ ready_batch,
+ FlussError::UnknownServerError,
+ format!("Missing response for table bucket {bucket}"),
+ )? {
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
invalid_physical_table_paths.insert(physical_table_path);
}
@@ -438,9 +430,8 @@ impl Sender {
let mut invalid_physical_table_paths: HashSet<Arc<PhysicalTablePath>>
= HashSet::new();
for batch in batches {
- if let Some(physical_table_path) = self
- .handle_write_batch_error(batch, error, message.clone())
- .await?
+ if let Some(physical_table_path) =
+ self.handle_write_batch_error(batch, error, message.clone())?
{
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
invalid_physical_table_paths.insert(physical_table_path);
@@ -451,7 +442,7 @@ impl Sender {
Ok(())
}
- async fn handle_batches_with_local_error(
+ fn handle_batches_with_local_error(
&self,
batches: Vec<ReadyWriteBatch>,
message: String,
@@ -467,7 +458,7 @@ impl Sender {
Ok(())
}
- async fn handle_write_batch_error(
+ fn handle_write_batch_error(
&self,
ready_write_batch: ReadyWriteBatch,
error: FlussError,
@@ -480,7 +471,7 @@ impl Sender {
physical_table_path.as_ref(),
ready_write_batch.table_bucket.bucket_id()
);
- self.re_enqueue_batch(ready_write_batch).await;
+ self.re_enqueue_batch(ready_write_batch);
return
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
}
@@ -504,9 +495,9 @@ impl Sender {
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path))
}
- async fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) {
+ fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) {
self.remove_from_inflight_batches(&ready_write_batch);
- self.accumulator.re_enqueue(ready_write_batch).await;
+ self.accumulator.re_enqueue(ready_write_batch);
}
fn remove_from_inflight_batches(&self, ready_write_batch:
&ReadyWriteBatch) {
@@ -656,7 +647,7 @@ mod tests {
use crate::test_utils::{build_cluster_arc, build_table_info};
use std::collections::{HashMap, HashSet};
- async fn build_ready_batch(
+ fn build_ready_batch(
accumulator: &RecordAccumulator,
cluster: Arc<Cluster>,
table_path: Arc<TablePath>,
@@ -667,11 +658,11 @@ mod tests {
values: vec![Datum::Int32(1)],
};
let record = WriteRecord::for_append(table_info, physical_table_path,
1, &row);
- let result = accumulator.append(&record, 0, &cluster, false).await?;
+ let result = accumulator.append(&record, 0, &cluster, false)?;
let result_handle = result.result_handle.expect("result handle");
let server = cluster.get_tablet_server(1).expect("server");
let nodes = HashSet::from([server.clone()]);
- let mut batches = accumulator.drain(cluster, &nodes, 1024 *
1024).await?;
+ let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?;
let mut drained = batches.remove(&1).expect("drained batches");
let batch = drained.pop().expect("batch");
Ok((batch, result_handle))
@@ -686,19 +677,21 @@ mod tests {
let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024,
1000, 1, 1);
let (batch, _handle) =
- build_ready_batch(accumulator.as_ref(), cluster.clone(),
table_path.clone()).await?;
+ build_ready_batch(accumulator.as_ref(), cluster.clone(),
table_path.clone())?;
let mut inflight = HashMap::new();
inflight.insert(1, vec![batch]);
sender.add_to_inflight_batches(&inflight);
let batch = inflight.remove(&1).unwrap().pop().unwrap();
- sender
- .handle_write_batch_error(batch, FlussError::RequestTimeOut,
"timeout".to_string())
- .await?;
+ sender.handle_write_batch_error(
+ batch,
+ FlussError::RequestTimeOut,
+ "timeout".to_string(),
+ )?;
let server = cluster.get_tablet_server(1).expect("server");
let nodes = HashSet::from([server.clone()]);
- let mut batches = accumulator.drain(cluster, &nodes, 1024 *
1024).await?;
+ let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?;
let mut drained = batches.remove(&1).expect("drained batches");
let batch = drained.pop().expect("batch");
assert_eq!(batch.write_batch.attempts(), 1);
@@ -713,15 +706,12 @@ mod tests {
let accumulator = Arc::new(RecordAccumulator::new(Config::default()));
let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024,
1000, 1, 0);
- let (batch, handle) =
- build_ready_batch(accumulator.as_ref(), cluster.clone(),
table_path).await?;
- sender
- .handle_write_batch_error(
- batch,
- FlussError::InvalidTableException,
- "invalid".to_string(),
- )
- .await?;
+ let (batch, handle) = build_ready_batch(accumulator.as_ref(),
cluster.clone(), table_path)?;
+ sender.handle_write_batch_error(
+ batch,
+ FlussError::InvalidTableException,
+ "invalid".to_string(),
+ )?;
let batch_result = handle.wait().await?;
assert!(matches!(
@@ -740,7 +730,7 @@ mod tests {
let accumulator = Arc::new(RecordAccumulator::new(Config::default()));
let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024,
1000, 1, 0);
- let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster,
table_path).await?;
+ let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster,
table_path)?;
let request_buckets = vec![batch.table_bucket.clone()];
let mut records_by_bucket = HashMap::new();
records_by_bucket.insert(batch.table_bucket.clone(), batch);
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index c386adf..330affa 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -91,7 +91,7 @@ impl WriterClient {
}
}
- pub async fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle>
{
+ pub fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle> {
let physical_table_path = &record.physical_table_path;
let cluster = self.metadata.get_cluster();
let bucket_key = record.bucket_key.as_ref();
@@ -99,19 +99,13 @@ impl WriterClient {
let (bucket_assigner, bucket_id) =
self.assign_bucket(&record.table_info, bucket_key,
physical_table_path)?;
- let mut result = self
- .accumulate
- .append(record, bucket_id, &cluster, true)
- .await?;
+ let mut result = self.accumulate.append(record, bucket_id, &cluster,
true)?;
if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
let bucket_id = bucket_assigner.assign_bucket(bucket_key,
&cluster)?;
- result = self
- .accumulate
- .append(record, bucket_id, &cluster, false)
- .await?;
+ result = self.accumulate.append(record, bucket_id, &cluster,
false)?;
}
if result.batch_is_full || result.new_batch_created {
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index 0bfe4a3..ab5f5b6 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -101,10 +101,7 @@ mod kv_table_test {
row.set_field(0, *id);
row.set_field(1, *name);
row.set_field(2, *age);
- upsert_writer
- .upsert(&row)
- .await
- .expect("Failed to upsert row");
+ upsert_writer.upsert(&row).expect("Failed to upsert row");
}
upsert_writer.flush().await.expect("Failed to flush");
@@ -138,7 +135,6 @@ mod kv_table_test {
updated_row.set_field(2, 33i64);
upsert_writer
.upsert(&updated_row)
- .await
.expect("Failed to upsert updated row")
.await
.expect("Failed to wait for upsert acknowledgment");
@@ -168,7 +164,6 @@ mod kv_table_test {
delete_row.set_field(0, 1);
upsert_writer
.delete(&delete_row)
- .await
.expect("Failed to delete")
.await
.expect("Failed to wait for delete acknowledgment");
@@ -268,7 +263,7 @@ mod kv_table_test {
row.set_field(0, *region);
row.set_field(1, *user_id);
row.set_field(2, *score);
- upsert_writer.upsert(&row).await.expect("Failed to upsert");
+ upsert_writer.upsert(&row).expect("Failed to upsert");
}
upsert_writer.flush().await.expect("Failed to flush");
@@ -308,7 +303,6 @@ mod kv_table_test {
update_row.set_field(2, 500i64);
upsert_writer
.upsert(&update_row)
- .await
.expect("Failed to update")
.await
.expect("Failed to wait for update acknowledgment");
@@ -379,7 +373,6 @@ mod kv_table_test {
row.set_field(3, 6942i64);
upsert_writer
.upsert(&row)
- .await
.expect("Failed to upsert initial row")
.await
.expect("Failed to wait for upsert acknowledgment");
@@ -421,7 +414,6 @@ mod kv_table_test {
partial_row.set_field(3, 420i64);
partial_writer
.upsert(&partial_row)
- .await
.expect("Failed to upsert")
.await
.expect("Failed to wait for upsert acknowledgment");
@@ -509,7 +501,7 @@ mod kv_table_test {
row.set_field(1, *user_id);
row.set_field(2, *name);
row.set_field(3, *score);
- upsert_writer.upsert(&row).await.expect("Failed to upsert");
+ upsert_writer.upsert(&row).expect("Failed to upsert");
}
upsert_writer.flush().await.expect("Failed to flush");
@@ -546,7 +538,6 @@ mod kv_table_test {
updated_row.set_field(3, 999i64);
upsert_writer
.upsert(&updated_row)
- .await
.expect("Failed to upsert updated row")
.await
.expect("Failed to wait for upsert acknowledgment");
@@ -585,7 +576,6 @@ mod kv_table_test {
delete_key.set_field(1, 1);
upsert_writer
.delete(&delete_key)
- .await
.expect("Failed to delete")
.await
.expect("Failed to wait for delete acknowledgment");
@@ -721,7 +711,6 @@ mod kv_table_test {
upsert_writer
.upsert(&row)
- .await
.expect("Failed to upsert row with all datatypes")
.await
.expect("Failed to wait for upsert acknowledgment");
@@ -826,7 +815,6 @@ mod kv_table_test {
upsert_writer
.upsert(&row_with_nulls)
- .await
.expect("Failed to upsert row with nulls")
.await
.expect("Failed to wait for upsert acknowledgment");
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index ed90fd0..82f8135 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -99,14 +99,12 @@ mod table_test {
record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2",
"a3"])).unwrap();
append_writer
.append_arrow_batch(batch1)
- .await
.expect("Failed to append batch");
let batch2 =
record_batch!(("c1", Int32, [4, 5, 6]), ("c2", Utf8, ["a4", "a5",
"a6"])).unwrap();
append_writer
.append_arrow_batch(batch2)
- .await
.expect("Failed to append batch");
// Flush to ensure all writes are acknowledged
@@ -230,7 +228,6 @@ mod table_test {
.unwrap();
append_writer
.append_arrow_batch(batch)
- .await
.expect("Failed to append batch");
// Flush to ensure all writes are acknowledged
@@ -332,7 +329,6 @@ mod table_test {
.unwrap();
append_writer
.append_arrow_batch(batch)
- .await
.expect("Failed to append batch");
append_writer.flush().await.expect("Failed to flush");
@@ -485,19 +481,16 @@ mod table_test {
.append_arrow_batch(
record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a",
"b"])).unwrap(),
)
- .await
.unwrap();
writer
.append_arrow_batch(
record_batch!(("id", Int32, [3, 4]), ("name", Utf8, ["c",
"d"])).unwrap(),
)
- .await
.unwrap();
writer
.append_arrow_batch(
record_batch!(("id", Int32, [5, 6]), ("name", Utf8, ["e",
"f"])).unwrap(),
)
- .await
.unwrap();
writer.flush().await.unwrap();
@@ -536,7 +529,6 @@ mod table_test {
.append_arrow_batch(
record_batch!(("id", Int32, [7, 8]), ("name", Utf8, ["g",
"h"])).unwrap(),
)
- .await
.unwrap();
writer.flush().await.unwrap();
@@ -752,7 +744,6 @@ mod table_test {
append_writer
.append(&row)
- .await
.expect("Failed to append row with all datatypes");
// Append a row with null values for all columns
@@ -763,7 +754,6 @@ mod table_test {
append_writer
.append(&row_with_nulls)
- .await
.expect("Failed to append row with nulls");
append_writer.flush().await.expect("Failed to flush");
@@ -1026,10 +1016,7 @@ mod table_test {
row.set_field(0, *id);
row.set_field(1, *region);
row.set_field(2, *value);
- append_writer
- .append(&row)
- .await
- .expect("Failed to append row");
+ append_writer.append(&row).expect("Failed to append row");
}
append_writer.flush().await.expect("Failed to flush");
@@ -1044,7 +1031,6 @@ mod table_test {
.unwrap();
append_writer
.append_arrow_batch(us_batch)
- .await
.expect("Failed to append US batch");
let eu_batch = record_batch!(
@@ -1055,7 +1041,6 @@ mod table_test {
.unwrap();
append_writer
.append_arrow_batch(eu_batch)
- .await
.expect("Failed to append EU batch");
append_writer
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index 0efe388..baac772 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -148,12 +148,11 @@ mod table_remote_scan_test {
row.set_field(0, i as i32);
let v = format!("v{}", i);
row.set_field(1, v.as_str());
- append_writer
- .append(&row)
- .await
- .expect("Failed to append row");
+ append_writer.append(&row).expect("Failed to append row");
}
+ append_writer.flush().await.expect("Failed to flush");
+
// Create a log scanner and subscribe to all buckets to read appended
records
let num_buckets = table.table_info().get_num_buckets();
let log_scanner = table