This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7f20192 chore: Fire-and-forget behaviour for effecient batching (#258)
7f20192 is described below
commit 7f20192b45cc0b7bb69bf823a61d76d41f9f7d3f
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 7 02:17:35 2026 +0000
chore: Fire-and-forget behaviour for effecient batching (#258)
---
bindings/cpp/examples/example.cpp | 16 +++++-
bindings/cpp/include/fluss.hpp | 27 ++++++++++
bindings/cpp/src/lib.rs | 37 +++++++++++---
bindings/cpp/src/table.cpp | 57 ++++++++++++++++++++--
bindings/python/example/example.py | 39 ++++++++-------
bindings/python/src/table.rs | 54 ++++++++++++++++----
bindings/python/src/upsert.rs | 47 +++++++++++++-----
crates/examples/src/example_kv_table.rs | 5 +-
.../examples/src/example_partitioned_kv_table.rs | 5 +-
crates/examples/src/example_table.rs | 8 +--
crates/fluss/src/client/table/append.rs | 30 +++++++++---
crates/fluss/src/client/table/upsert.rs | 30 +++++++-----
crates/fluss/src/client/write/mod.rs | 41 ++++++++++++++++
crates/fluss/tests/integration/kv_table.rs | 53 ++++++++++++++------
crates/fluss/tests/integration/log_table.rs | 4 ++
15 files changed, 360 insertions(+), 93 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 7022cad..10266c7 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -91,6 +91,7 @@ int main() {
{3, "Charlie", 92.1f, 35},
};
+ // Fire-and-forget: queue rows, flush at end
for (const auto& r : rows) {
fluss::GenericRow row;
row.SetInt32(0, r.id);
@@ -100,7 +101,20 @@ int main() {
check("append", writer.Append(row));
}
check("flush", writer.Flush());
- std::cout << "Wrote " << rows.size() << " rows" << std::endl;
+ std::cout << "Wrote " << rows.size() << " rows (fire-and-forget + flush)"
<< std::endl;
+
+ // Per-record acknowledgment
+ {
+ fluss::GenericRow row;
+ row.SetInt32(0, 100);
+ row.SetString(1, "AckTest");
+ row.SetFloat32(2, 99.9f);
+ row.SetInt32(3, 42);
+ fluss::WriteResult wr;
+ check("append", writer.Append(row, wr));
+ check("wait", wr.Wait());
+ std::cout << "Row acknowledged by server" << std::endl;
+ }
// 6) Scan
fluss::LogScanner scanner;
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 9461f68..6c20717 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -37,6 +37,7 @@ namespace ffi {
struct Admin;
struct Table;
struct AppendWriter;
+ struct WriteResult;
struct LogScanner;
} // namespace ffi
@@ -409,6 +410,7 @@ struct PartitionInfo {
};
class AppendWriter;
+class WriteResult;
class LogScanner;
class Admin;
class Table;
@@ -534,6 +536,30 @@ private:
std::vector<size_t> projection_;
};
+class WriteResult {
+public:
+ WriteResult() noexcept;
+ ~WriteResult() noexcept;
+
+ WriteResult(const WriteResult&) = delete;
+ WriteResult& operator=(const WriteResult&) = delete;
+ WriteResult(WriteResult&& other) noexcept;
+ WriteResult& operator=(WriteResult&& other) noexcept;
+
+ bool Available() const;
+
+ /// Wait for server acknowledgment of the write.
+ /// For fire-and-forget, simply let the WriteResult go out of scope.
+ Result Wait();
+
+private:
+ friend class AppendWriter;
+ WriteResult(ffi::WriteResult* inner) noexcept;
+
+ void Destroy() noexcept;
+ ffi::WriteResult* inner_{nullptr};
+};
+
class AppendWriter {
public:
AppendWriter() noexcept;
@@ -547,6 +573,7 @@ public:
bool Available() const;
Result Append(const GenericRow& row);
+ Result Append(const GenericRow& row, WriteResult& out);
Result Flush();
private:
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index afdb7c0..7944c10 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -190,6 +190,7 @@ mod ffi {
type Admin;
type Table;
type AppendWriter;
+ type WriteResult;
type LogScanner;
// Connection
@@ -253,9 +254,12 @@ mod ffi {
// AppendWriter
unsafe fn delete_append_writer(writer: *mut AppendWriter);
- fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> FfiResult;
+ fn append(self: &mut AppendWriter, row: &FfiGenericRow) ->
Result<Box<WriteResult>>;
fn flush(self: &mut AppendWriter) -> FfiResult;
+ // WriteResult — dropped automatically via rust::Box, or call wait()
for ack
+ fn wait(self: &mut WriteResult) -> FfiResult;
+
// LogScanner
unsafe fn delete_log_scanner(scanner: *mut LogScanner);
fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) ->
FfiResult;
@@ -299,6 +303,10 @@ pub struct AppendWriter {
inner: fcore::client::AppendWriter,
}
+pub struct WriteResult {
+ inner: Option<fcore::client::WriteResultFuture>,
+}
+
pub struct LogScanner {
inner: Option<fcore::client::LogScanner>,
inner_batch: Option<fcore::client::RecordBatchLogScanner>,
@@ -750,15 +758,16 @@ unsafe fn delete_append_writer(writer: *mut AppendWriter)
{
}
impl AppendWriter {
- fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult {
+ fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>,
String> {
let generic_row = types::ffi_row_to_core(row);
- let result = RUNTIME.block_on(async {
self.inner.append(&generic_row).await });
+ let result_future = RUNTIME
+ .block_on(async { self.inner.append(&generic_row).await })
+ .map_err(|e| format!("Failed to append: {e}"))?;
- match result {
- Ok(_) => ok_result(),
- Err(e) => err_result(1, e.to_string()),
- }
+ Ok(Box::new(WriteResult {
+ inner: Some(result_future),
+ }))
}
fn flush(&mut self) -> ffi::FfiResult {
@@ -771,6 +780,20 @@ impl AppendWriter {
}
}
+impl WriteResult {
+ fn wait(&mut self) -> ffi::FfiResult {
+ if let Some(future) = self.inner.take() {
+ let result = RUNTIME.block_on(future);
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else {
+ ok_result()
+ }
+ }
+}
+
// LogScanner implementation
unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
if !scanner.is_null() {
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index efb762b..24be8d4 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -152,6 +152,45 @@ bool Table::HasPrimaryKey() const {
return table_->has_primary_key();
}
+// WriteResult implementation
+WriteResult::WriteResult() noexcept = default;
+
+WriteResult::WriteResult(ffi::WriteResult* inner) noexcept : inner_(inner) {}
+
+WriteResult::~WriteResult() noexcept { Destroy(); }
+
+void WriteResult::Destroy() noexcept {
+ if (inner_) {
+ // Reconstruct the rust::Box to let Rust drop the value
+ rust::Box<ffi::WriteResult>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+WriteResult::WriteResult(WriteResult&& other) noexcept : inner_(other.inner_) {
+ other.inner_ = nullptr;
+}
+
+WriteResult& WriteResult::operator=(WriteResult&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+bool WriteResult::Available() const { return inner_ != nullptr; }
+
+Result WriteResult::Wait() {
+ if (!Available()) {
+ return utils::make_ok();
+ }
+
+ auto ffi_result = inner_->wait();
+ return utils::from_ffi_result(ffi_result);
+}
+
// AppendWriter implementation
AppendWriter::AppendWriter() noexcept = default;
@@ -182,13 +221,25 @@ AppendWriter& AppendWriter::operator=(AppendWriter&&
other) noexcept {
bool AppendWriter::Available() const { return writer_ != nullptr; }
Result AppendWriter::Append(const GenericRow& row) {
+ WriteResult wr;
+ return Append(row, wr);
+}
+
+Result AppendWriter::Append(const GenericRow& row, WriteResult& out) {
if (!Available()) {
return utils::make_error(1, "AppendWriter not available");
}
- auto ffi_row = utils::to_ffi_generic_row(row);
- auto ffi_result = writer_->append(ffi_row);
- return utils::from_ffi_result(ffi_result);
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->append(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
}
Result AppendWriter::Flush() {
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 8735038..1cabaa5 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -417,11 +417,13 @@ async def main():
print(f"Has primary key: {pk_table.has_primary_key()}")
# --- Test Upsert ---
- print("\n--- Testing Upsert ---")
+ print("\n--- Testing Upsert (fire-and-forget) ---")
try:
upsert_writer = pk_table.new_upsert()
print(f"Created upsert writer: {upsert_writer}")
+ # Fire-and-forget: queue writes without waiting for individual acks.
+ # Records are batched internally for efficiency.
await upsert_writer.upsert(
{
"user_id": 1,
@@ -437,7 +439,7 @@ async def main():
"balance": Decimal("1234.56"),
}
)
- print("Upserted user_id=1 (Alice)")
+ print("Queued user_id=1 (Alice)")
await upsert_writer.upsert(
{
@@ -452,7 +454,7 @@ async def main():
"balance": Decimal("5678.91"),
}
)
- print("Upserted user_id=2 (Bob)")
+ print("Queued user_id=2 (Bob)")
await upsert_writer.upsert(
{
@@ -467,10 +469,17 @@ async def main():
"balance": Decimal("9876.54"),
}
)
- print("Upserted user_id=3 (Charlie)")
+ print("Queued user_id=3 (Charlie)")
- # Update an existing row (same PK, different values)
- await upsert_writer.upsert(
+ # flush() waits for all queued writes to be acknowledged by the server
+ await upsert_writer.flush()
+ print("Flushed — all 3 rows acknowledged by server")
+
+ # Per-record acknowledgment: await the returned handle to block until
+ # the server confirms this specific write, useful when you need to
+ # read-after-write or verify critical updates.
+ print("\n--- Testing Upsert (per-record acknowledgment) ---")
+ ack = await upsert_writer.upsert(
{
"user_id": 1,
"name": "Alice Updated",
@@ -485,11 +494,8 @@ async def main():
"balance": Decimal("2345.67"),
}
)
- print("Updated user_id=1 (Alice -> Alice Updated)")
-
- # Explicit flush to ensure all upserts are acknowledged
- await upsert_writer.flush()
- print("Flushed all upserts")
+ await ack # wait for server acknowledgment before proceeding
+ print("Updated user_id=1 (Alice -> Alice Updated) — server
acknowledged")
except Exception as e:
print(f"Error during upsert: {e}")
@@ -548,13 +554,10 @@ async def main():
try:
upsert_writer = pk_table.new_upsert()
- # Delete only needs PK columns - much simpler API!
- await upsert_writer.delete({"user_id": 3})
- print("Deleted user_id=3")
-
- # Explicit flush to ensure delete is acknowledged
- await upsert_writer.flush()
- print("Flushed delete")
+ # Per-record ack for delete — await the handle to confirm deletion
+ ack = await upsert_writer.delete({"user_id": 3})
+ await ack
+ print("Deleted user_id=3 — server acknowledged")
lookuper = pk_table.new_lookup()
result = await lookuper.lookup({"user_id": 3})
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 7184c8d..e987d43 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -526,35 +526,59 @@ pub struct AppendWriter {
#[pymethods]
impl AppendWriter {
- /// Write Arrow table data
+ /// Write Arrow table data (fire-and-forget, use flush() to ensure
delivery)
pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> {
// Convert Arrow Table to batches and write each batch
let batches = table.call_method0(py, "to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
for batch in batch_list {
- self.write_arrow_batch(py, batch)?;
+ // Drop the ack coroutine — fire-and-forget
+ let _ = self.write_arrow_batch(py, batch)?;
}
Ok(())
}
/// Write Arrow batch data
- pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) ->
PyResult<()> {
+ ///
+ /// Returns:
+ /// A coroutine that can be awaited for server acknowledgment,
+ /// or ignored for fire-and-forget behavior.
+ pub fn write_arrow_batch<'py>(
+ &self,
+ py: Python<'py>,
+ batch: Py<PyAny>,
+ ) -> PyResult<Bound<'py, PyAny>> {
// This shares the underlying Arrow buffers without copying data
let batch_bound = batch.bind(py);
let rust_batch: ArrowRecordBatch =
FromPyArrow::from_pyarrow_bound(batch_bound)
.map_err(|e| FlussError::new_err(format!("Failed to convert
RecordBatch: {e}")))?;
let inner = self.inner.clone();
- // Release the GIL before blocking on async operation
- let result = py.detach(|| {
- TOKIO_RUNTIME.block_on(async {
inner.append_arrow_batch(rust_batch).await })
- });
- result.map_err(|e| FlussError::new_err(e.to_string()))
+ future_into_py(py, async move {
+ let result_future = inner
+ .append_arrow_batch(rust_batch)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ Python::attach(|py| {
+ future_into_py(py, async move {
+ result_future
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ .map(|bound| bound.unbind())
+ })
+ })
}
/// Append a single row to the table
+ ///
+ /// Returns:
+ /// A coroutine that can be awaited for server acknowledgment,
+ /// or ignored for fire-and-forget behavior.
pub fn append<'py>(
&self,
py: Python<'py>,
@@ -564,10 +588,20 @@ impl AppendWriter {
let inner = self.inner.clone();
future_into_py(py, async move {
- inner
+ let result_future = inner
.append(&generic_row)
.await
- .map_err(|e| FlussError::new_err(e.to_string()))
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ Python::attach(|py| {
+ future_into_py(py, async move {
+ result_future
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ .map(|bound| bound.unbind())
+ })
})
}
diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs
index 08b3597..5c10ded 100644
--- a/bindings/python/src/upsert.rs
+++ b/bindings/python/src/upsert.rs
@@ -23,16 +23,21 @@ use tokio::sync::Mutex;
/// Writer for upserting and deleting data in a Fluss primary key table.
///
-/// Each upsert/delete operation is sent to the server and waits for
acknowledgment.
-/// Multiple concurrent writers share a common WriterClient which batches
requests
-/// for efficiency.
+/// Each upsert/delete operation queues the write and returns a coroutine
+/// that can be awaited for per-record acknowledgment, or ignored for
+/// fire-and-forget semantics (call `flush()` to ensure delivery).
///
/// # Example:
/// writer = table.new_upsert()
+///
+/// # Fire-and-forget with flush
/// await writer.upsert(row1)
/// await writer.upsert(row2)
-/// await writer.delete(pk)
-/// await writer.flush() # Ensures all pending operations are acknowledged
+/// await writer.flush()
+///
+/// # Or await individual acknowledgment
+/// ack = await writer.upsert(row3)
+/// await ack
#[pyclass]
pub struct UpsertWriter {
inner: Arc<UpsertWriterInner>,
@@ -58,7 +63,8 @@ impl UpsertWriter {
/// For list/tuple: values must be in schema order.
///
/// Returns:
- /// None on success
+ /// A coroutine that can be awaited for server acknowledgment,
+ /// or ignored for fire-and-forget behavior.
pub fn upsert<'py>(
&self,
py: Python<'py>,
@@ -70,11 +76,20 @@ impl UpsertWriter {
future_into_py(py, async move {
let mut guard = inner.get_or_create_writer().await?;
let writer = guard.as_mut().unwrap();
- writer
+ let result_future = writer
.upsert(&generic_row)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
- Ok(())
+
+ Python::attach(|py| {
+ future_into_py(py, async move {
+ result_future
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ .map(|bound| bound.unbind())
+ })
})
}
@@ -86,7 +101,8 @@ impl UpsertWriter {
/// For list/tuple: values in PK column order.
///
/// Returns:
- /// None on success
+ /// A coroutine that can be awaited for server acknowledgment,
+ /// or ignored for fire-and-forget behavior.
pub fn delete<'py>(
&self,
py: Python<'py>,
@@ -98,11 +114,20 @@ impl UpsertWriter {
future_into_py(py, async move {
let mut guard = inner.get_or_create_writer().await?;
let writer = guard.as_mut().unwrap();
- writer
+ let result_future = writer
.delete(&generic_row)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
- Ok(())
+
+ Python::attach(|py| {
+ future_into_py(py, async move {
+ result_future
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ Ok(())
+ })
+ .map(|bound| bound.unbind())
+ })
})
}
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 2bbcc74..2fcb134 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -65,6 +65,7 @@ pub async fn main() -> Result<()> {
upsert_writer.upsert(&row).await?;
println!("Upserted: {row:?}");
}
+ upsert_writer.flush().await?;
println!("\n=== Looking up ===");
let mut lookuper = table.new_lookup()?.create_lookuper()?;
@@ -84,7 +85,7 @@ pub async fn main() -> Result<()> {
row.set_field(0, 1);
row.set_field(1, "Verso");
row.set_field(2, 33i64);
- upsert_writer.upsert(&row).await?;
+ upsert_writer.upsert(&row).await?.await?;
println!("Updated: {row:?}");
let result = lookuper.lookup(&make_key(1)).await?;
@@ -99,7 +100,7 @@ pub async fn main() -> Result<()> {
// For delete, only primary key field needs to be set; other fields can
remain null
let mut row = GenericRow::new(3);
row.set_field(0, 2);
- upsert_writer.delete(&row).await?;
+ upsert_writer.delete(&row).await?.await?;
println!("Deleted row with id=2");
let result = lookuper.lookup(&make_key(2)).await?;
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index 884869e..feb8f05 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -77,6 +77,7 @@ pub async fn main() -> Result<()> {
upsert_writer.upsert(&row).await?;
println!("Upserted: {row:?}");
}
+ upsert_writer.flush().await?;
println!("\n=== Looking up ===");
let mut lookuper = table.new_lookup()?.create_lookuper()?;
@@ -101,7 +102,7 @@ pub async fn main() -> Result<()> {
row.set_field(1, "APAC");
row.set_field(2, 1i64);
row.set_field(3, 4321i64);
- upsert_writer.upsert(&row).await?;
+ upsert_writer.upsert(&row).await?.await?;
println!("Updated: {row:?}");
let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
@@ -117,7 +118,7 @@ pub async fn main() -> Result<()> {
row.set_field(0, 1002);
row.set_field(1, "EMEA");
row.set_field(2, 2i64);
- upsert_writer.delete(&row).await?;
+ upsert_writer.delete(&row).await?.await?;
println!("Deleted: {row:?}");
let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index 733b13e..ee9bc7b 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -25,7 +25,6 @@ use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::time::Duration;
-use tokio::try_join;
#[tokio::main]
pub async fn main() -> Result<()> {
@@ -64,13 +63,14 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer()?;
- let f1 = append_writer.append(&row);
+ // Fire-and-forget: queue writes then flush
+ append_writer.append(&row).await?;
let mut row = GenericRow::new(3);
row.set_field(0, 233333);
row.set_field(1, "tt44");
row.set_field(2, 987_654_321_987i64);
- let f2 = append_writer.append(&row);
- try_join!(f1, f2, append_writer.flush())?;
+ append_writer.append(&row).await?;
+ append_writer.flush().await?;
// scan rows
let log_scanner = table.new_scan().create_log_scanner()?;
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index ace91a6..e26b61a 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::client::table::partition_getter::{PartitionGetter,
get_physical_path};
-use crate::client::{WriteRecord, WriterClient};
+use crate::client::{WriteRecord, WriteResultFuture, WriterClient};
use crate::error::Result;
use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
use crate::row::{ColumnarRow, InternalRow};
@@ -69,7 +69,18 @@ pub struct AppendWriter {
}
impl AppendWriter {
- pub async fn append<R: InternalRow>(&self, row: &R) -> Result<()> {
+ /// Appends a row to the table.
+ ///
+ /// This method returns a [`WriteResultFuture`] immediately after queueing
the write,
+ /// enabling fire-and-forget semantics for efficient batching.
+ ///
+ /// # Arguments
+ /// * row - the row to append.
+ ///
+ /// # Returns
+ /// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
+ /// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
+ pub async fn append<R: InternalRow>(&self, row: &R) ->
Result<WriteResultFuture> {
let physical_table_path = Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
@@ -82,15 +93,21 @@ impl AppendWriter {
row,
);
let result_handle = self.writer_client.send(&record).await?;
- let result = result_handle.wait().await?;
- result_handle.result(result)
+ Ok(WriteResultFuture::new(result_handle))
}
/// Appends an Arrow RecordBatch to the table.
///
+ /// This method returns a [`WriteResultFuture`] immediately after queueing
the write,
+ /// enabling fire-and-forget semantics for efficient batching.
+ ///
/// For partitioned tables, the partition is derived from the **first
row** of the batch.
/// Callers must ensure all rows in the batch belong to the same partition.
- pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+ ///
+ /// # Returns
+ /// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
+ /// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
+ pub async fn append_arrow_batch(&self, batch: RecordBatch) ->
Result<WriteResultFuture> {
let physical_table_path = if self.partition_getter.is_some() &&
batch.num_rows() > 0 {
let first_row = ColumnarRow::new(Arc::new(batch.clone()));
Arc::new(get_physical_path(
@@ -109,8 +126,7 @@ impl AppendWriter {
batch,
);
let result_handle = self.writer_client.send(&record).await?;
- let result = result_handle.wait().await?;
- result_handle.result(result)
+ Ok(WriteResultFuture::new(result_handle))
}
pub async fn flush(&self) -> Result<()> {
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index 92f6a20..a1646cc 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
+use crate::client::{RowBytes, WriteFormat, WriteRecord, WriteResultFuture,
WriterClient};
use crate::error::Error::{IllegalArgument, UnexpectedError};
use crate::error::Result;
use crate::metadata::{RowType, TableInfo, TablePath};
@@ -345,12 +345,16 @@ impl UpsertWriter {
/// Inserts row into Fluss table if they do not already exist, or updates
them if they do exist.
///
+ /// This method returns a [`WriteResultFuture`] immediately after queueing
the write,
+ /// enabling fire-and-forget semantics for efficient batching.
+ ///
/// # Arguments
/// * row - the row to upsert.
///
/// # Returns
- /// Ok(UpsertResult) when completed normally
- pub async fn upsert<R: InternalRow>(&self, row: &R) ->
Result<UpsertResult> {
+ /// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
+ /// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
+ pub async fn upsert<R: InternalRow>(&self, row: &R) ->
Result<WriteResultFuture> {
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
@@ -376,20 +380,22 @@ impl UpsertWriter {
);
let result_handle = self.writer_client.send(&write_record).await?;
- let result = result_handle.wait().await?;
-
- result_handle.result(result).map(|_| UpsertResult)
+ Ok(WriteResultFuture::new(result_handle))
}
/// Delete certain row by the input row in Fluss table, the input row must
contain the primary
/// key.
///
+ /// This method returns a [`WriteResultFuture`] immediately after queueing
the delete,
+ /// enabling fire-and-forget semantics for efficient batching.
+ ///
/// # Arguments
- /// * row - the row to delete.
+ /// * row - the row to delete (must contain the primary key fields).
///
/// # Returns
- /// Ok(DeleteResult) when completed normally
- pub async fn delete<R: InternalRow>(&self, row: &R) ->
Result<DeleteResult> {
+ /// A [`WriteResultFuture`] that can be awaited to wait for server
acknowledgment,
+ /// or dropped for fire-and-forget behavior (use `flush()` to ensure
delivery).
+ pub async fn delete<R: InternalRow>(&self, row: &R) ->
Result<WriteResultFuture> {
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
@@ -410,9 +416,7 @@ impl UpsertWriter {
);
let result_handle = self.writer_client.send(&write_record).await?;
- let result = result_handle.wait().await?;
-
- result_handle.result(result).map(|_| DeleteResult)
+ Ok(WriteResultFuture::new(result_handle))
}
}
@@ -546,9 +550,11 @@ mod tests {
/// The result of upserting a record
/// Currently this is an empty struct to allow for compatible evolution in the
future
#[derive(Default)]
+#[allow(dead_code)]
pub struct UpsertResult;
/// The result of deleting a record
/// Currently this is an empty struct to allow for compatible evolution in the
future
#[derive(Default)]
+#[allow(dead_code)]
pub struct DeleteResult;
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index 25a0db6..49eff05 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -26,7 +26,10 @@ use crate::row::InternalRow;
pub use accumulator::*;
use arrow::array::RecordBatch;
use bytes::Bytes;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
pub(crate) mod broadcast;
mod bucket_assigner;
@@ -198,3 +201,41 @@ impl ResultHandle {
})
}
}
+
+/// A future that represents a pending write operation.
+///
+/// This type implements [`Future`], allowing users to either:
+/// 1. Await immediately to block on acknowledgment:
`writer.upsert(&row).await?.await?`
+/// 2. Fire-and-forget with later flush: `writer.upsert(&row).await?;
writer.flush().await?`
+///
+/// This pattern is similar to rdkafka's `DeliveryFuture` and allows for
efficient batching
+/// when users don't need immediate per-record acknowledgment.
+pub struct WriteResultFuture {
+ inner: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
+}
+
+impl std::fmt::Debug for WriteResultFuture {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("WriteResultFuture").finish_non_exhaustive()
+ }
+}
+
+impl WriteResultFuture {
+ /// Create a new WriteResultFuture from a ResultHandle.
+ pub fn new(result_handle: ResultHandle) -> Self {
+ Self {
+ inner: Box::pin(async move {
+ let result = result_handle.wait().await?;
+ result_handle.result(result)
+ }),
+ }
+ }
+}
+
+impl Future for WriteResultFuture {
+ type Output = Result<(), Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
+ self.inner.as_mut().poll(cx)
+ }
+}
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index 87d90b0..0bfe4a3 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -95,7 +95,7 @@ mod kv_table_test {
let test_data = [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie",
35)];
- // Upsert rows
+ // Upsert rows (fire-and-forget, then flush)
for (id, name, age) in &test_data {
let mut row = GenericRow::new(3);
row.set_field(0, *id);
@@ -106,6 +106,7 @@ mod kv_table_test {
.await
.expect("Failed to upsert row");
}
+ upsert_writer.flush().await.expect("Failed to flush");
// Lookup records
let mut lookuper = table
@@ -130,7 +131,7 @@ mod kv_table_test {
assert_eq!(row.get_long(2), *expected_age, "age mismatch");
}
- // Update the record with new age
+ // Update the record with new age (await acknowledgment)
let mut updated_row = GenericRow::new(3);
updated_row.set_field(0, 1);
updated_row.set_field(1, "Verso");
@@ -138,7 +139,9 @@ mod kv_table_test {
upsert_writer
.upsert(&updated_row)
.await
- .expect("Failed to upsert updated row");
+ .expect("Failed to upsert updated row")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
// Verify the update
let result = lookuper
@@ -160,13 +163,15 @@ mod kv_table_test {
"Name should remain unchanged"
);
- // Delete record with id=1
+ // Delete record with id=1 (await acknowledgment)
let mut delete_row = GenericRow::new(3);
delete_row.set_field(0, 1);
upsert_writer
.delete(&delete_row)
.await
- .expect("Failed to delete");
+ .expect("Failed to delete")
+ .await
+ .expect("Failed to wait for delete acknowledgment");
// Verify deletion
let result = lookuper
@@ -265,6 +270,7 @@ mod kv_table_test {
row.set_field(2, *score);
upsert_writer.upsert(&row).await.expect("Failed to upsert");
}
+ upsert_writer.flush().await.expect("Failed to flush");
// Lookup with composite key
let mut lookuper = table
@@ -295,7 +301,7 @@ mod kv_table_test {
.expect("Row should exist");
assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
- // Update (US, 1) score
+ // Update (US, 1) score (await acknowledgment)
let mut update_row = GenericRow::new(3);
update_row.set_field(0, "US");
update_row.set_field(1, 1);
@@ -303,7 +309,9 @@ mod kv_table_test {
upsert_writer
.upsert(&update_row)
.await
- .expect("Failed to update");
+ .expect("Failed to update")
+ .await
+ .expect("Failed to wait for update acknowledgment");
// Verify update
let mut key = GenericRow::new(3);
@@ -372,7 +380,9 @@ mod kv_table_test {
upsert_writer
.upsert(&row)
.await
- .expect("Failed to upsert initial row");
+ .expect("Failed to upsert initial row")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
// Verify initial record
let mut lookuper = table
@@ -403,7 +413,7 @@ mod kv_table_test {
.create_writer()
.expect("Failed to create UpsertWriter with partial write");
- // Update only the score column
+ // Update only the score column (await acknowledgment)
let mut partial_row = GenericRow::new(4);
partial_row.set_field(0, 1);
partial_row.set_field(1, Datum::Null); // not in partial update column
@@ -412,7 +422,9 @@ mod kv_table_test {
partial_writer
.upsert(&partial_row)
.await
- .expect("Failed to upsert");
+ .expect("Failed to upsert")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
// Verify partial update - name and age should remain unchanged
let result = lookuper
@@ -499,6 +511,7 @@ mod kv_table_test {
row.set_field(3, *score);
upsert_writer.upsert(&row).await.expect("Failed to upsert");
}
+ upsert_writer.flush().await.expect("Failed to flush");
// Create lookuper
let mut lookuper = table
@@ -525,7 +538,7 @@ mod kv_table_test {
assert_eq!(row.get_long(3), *expected_score, "score mismatch");
}
- // Test update within a partition
+ // Test update within a partition (await acknowledgment)
let mut updated_row = GenericRow::new(4);
updated_row.set_field(0, "US");
updated_row.set_field(1, 1);
@@ -534,7 +547,9 @@ mod kv_table_test {
upsert_writer
.upsert(&updated_row)
.await
- .expect("Failed to upsert updated row");
+ .expect("Failed to upsert updated row")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
// Verify the update
let mut key = GenericRow::new(4);
@@ -564,14 +579,16 @@ mod kv_table_test {
"Lookup in non-existent partition should return None"
);
- // Delete a record within a partition
+ // Delete a record within a partition (await acknowledgment)
let mut delete_key = GenericRow::new(4);
delete_key.set_field(0, "EU");
delete_key.set_field(1, 1);
upsert_writer
.delete(&delete_key)
.await
- .expect("Failed to delete");
+ .expect("Failed to delete")
+ .await
+ .expect("Failed to wait for delete acknowledgment");
// Verify deletion
let mut key = GenericRow::new(4);
@@ -705,7 +722,9 @@ mod kv_table_test {
upsert_writer
.upsert(&row)
.await
- .expect("Failed to upsert row with all datatypes");
+ .expect("Failed to upsert row with all datatypes")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
// Lookup the record
let mut lookuper = table
@@ -808,7 +827,9 @@ mod kv_table_test {
upsert_writer
.upsert(&row_with_nulls)
.await
- .expect("Failed to upsert row with nulls");
+ .expect("Failed to upsert row with nulls")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
// Lookup row with nulls
let mut key2 = GenericRow::new(17);
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 493bb34..8d7773d 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -109,6 +109,7 @@ mod table_test {
.await
.expect("Failed to append batch");
+ // Flush to ensure all writes are acknowledged
append_writer.flush().await.expect("Failed to flush");
// Create scanner to verify appended records
@@ -232,6 +233,9 @@ mod table_test {
.await
.expect("Failed to append batch");
+ // Flush to ensure all writes are acknowledged
+ append_writer.flush().await.expect("Failed to flush");
+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let after_append_ms = Timestamp::now().as_millisecond();