This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b4fad4b feat: KvWriteBatch wiring in Sender (#184)
b4fad4b is described below
commit b4fad4bea896d0e5ee1df4b2ed0a751dda46728f
Author: Keith Lee <[email protected]>
AuthorDate: Wed Jan 21 01:50:55 2026 +0000
feat: KvWriteBatch wiring in Sender (#184)
---
crates/examples/src/example_kv_table.rs | 4 +-
crates/fluss/src/client/write/batch.rs | 4 +
crates/fluss/src/client/write/sender.rs | 248 ++++++++++++++++++++++++--------
crates/fluss/src/rpc/message/mod.rs | 1 +
4 files changed, 197 insertions(+), 60 deletions(-)
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 75821a3..dcf7db8 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -69,7 +69,7 @@ pub async fn main() -> Result<()> {
println!("\n=== Looking up ===");
let mut lookuper = table.new_lookup()?.create_lookuper()?;
- for id in 1..=2 {
+ for id in 1..=3 {
let result = lookuper.lookup(&make_key(id)).await?;
let row = result.get_single_row()?.unwrap();
println!(
@@ -98,6 +98,8 @@ pub async fn main() -> Result<()> {
println!("\n=== Deleting ===");
let mut row = GenericRow::new();
row.set_field(0, 2);
+ row.set_field(1, "");
+ row.set_field(2, 0i64);
upsert_writer.delete(&row).await?;
println!("Deleted: {row:?}");
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 2ddf519..159e313 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -336,6 +336,10 @@ impl KvWriteBatch {
pub fn close(&mut self) -> Result<()> {
self.kv_batch_builder.close()
}
+
+ pub fn target_columns(&self) -> Option<&Arc<Vec<usize>>> {
+ self.target_columns.as_ref()
+ }
}
#[cfg(test)]
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index 7ea24e3..ceed245 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -17,11 +17,16 @@
use crate::client::broadcast;
use crate::client::metadata::Metadata;
+use crate::client::write::batch::WriteBatch;
use crate::client::{ReadyWriteBatch, RecordAccumulator};
+use crate::error::Error::UnexpectedError;
use crate::error::{FlussError, Result};
use crate::metadata::{TableBucket, TablePath};
-use crate::proto::ProduceLogResponse;
-use crate::rpc::message::ProduceLogRequest;
+use crate::proto::{
+ PbProduceLogRespForBucket, PbPutKvRespForBucket, ProduceLogResponse,
PutKvResponse,
+};
+use crate::rpc::ServerConnection;
+use crate::rpc::message::{ProduceLogRequest, PutKvRequest};
use log::warn;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
@@ -182,23 +187,22 @@ impl Sender {
.iter()
.filter_map(|bucket| records_by_bucket.remove(bucket))
.collect();
+
if request_batches.is_empty() {
continue;
}
- let request = match ProduceLogRequest::new(
+
+ let write_request = match Self::build_write_request(
table_id,
acks,
self.max_request_timeout_ms,
&mut request_batches,
) {
- Ok(request) => request,
+ Ok(req) => req,
Err(e) => {
self.handle_batches_with_local_error(
- table_buckets
- .iter()
- .filter_map(|bucket|
records_by_bucket.remove(bucket))
- .collect(),
- format!("Failed to build produce request: {e}"),
+ request_batches,
+ format!("Failed to build write request: {e}"),
)
.await?;
continue;
@@ -211,27 +215,12 @@ impl Sender {
records_by_bucket.insert(request_batch.table_bucket.clone(),
request_batch);
}
- let response = match connection.request(request).await {
- Ok(response) => response,
- Err(e) => {
- self.handle_batches_with_error(
- table_buckets
- .iter()
- .filter_map(|bucket|
records_by_bucket.remove(bucket))
- .collect(),
- FlussError::NetworkException,
- format!("Failed to send produce request: {e}"),
- )
- .await?;
- continue;
- }
- };
-
- self.handle_produce_response(
+ self.send_and_handle_response(
+ &connection,
+ write_request,
table_id,
&table_buckets,
&mut records_by_bucket,
- response,
)
.await?;
}
@@ -239,50 +228,120 @@ impl Sender {
Ok(())
}
- async fn handle_produce_response(
+ fn build_write_request(
+ table_id: i64,
+ acks: i16,
+ timeout_ms: i32,
+ request_batches: &mut [ReadyWriteBatch],
+ ) -> Result<WriteRequest> {
+ let first_batch = &request_batches.first().unwrap().write_batch;
+
+ let request = match first_batch {
+ WriteBatch::ArrowLog(_) => {
+ let req = ProduceLogRequest::new(table_id, acks, timeout_ms,
request_batches)?;
+ WriteRequest::ProduceLog(req)
+ }
+ WriteBatch::Kv(kv_write_batch) => {
+ let target_columns = kv_write_batch.target_columns();
+ for batch in request_batches.iter().skip(1) {
+ match &batch.write_batch {
+ WriteBatch::ArrowLog(_) => {
+ return Err(UnexpectedError {
+ message: "Expecting KvWriteBatch but found
ArrowLogWriteBatch"
+ .to_string(),
+ source: None,
+ });
+ }
+ WriteBatch::Kv(kvb) => {
+ if target_columns != kvb.target_columns() {
+ return Err(UnexpectedError {
+ message: format!(
+ "All the write batches to make put kv
request should have the same target columns, but got {:?} and {:?}.",
+ target_columns,
+ kvb.target_columns()
+ ),
+ source: None,
+ });
+ }
+ }
+ }
+ }
+ let cols = target_columns
+ .map(|arc| arc.iter().map(|&c| c as i32).collect())
+ .unwrap_or_default();
+ let req = PutKvRequest::new(table_id, acks, timeout_ms, cols,
request_batches)?;
+ WriteRequest::PutKv(req)
+ }
+ };
+
+ Ok(request)
+ }
+
+ async fn send_and_handle_response(
+ &self,
+ connection: &ServerConnection,
+ write_request: WriteRequest,
+ table_id: i64,
+ table_buckets: &[TableBucket],
+ records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
+ ) -> Result<()> {
+ macro_rules! send {
+ ($request:expr) => {
+ match connection.request($request).await {
+ Ok(response) => {
+ self.handle_write_response(
+ table_id,
+ table_buckets,
+ records_by_bucket,
+ response,
+ )
+ .await
+ }
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|b| records_by_bucket.remove(b))
+ .collect(),
+ FlussError::NetworkException,
+ format!("Failed to send write request: {e}"),
+ )
+ .await
+ }
+ }
+ };
+ }
+
+ match write_request {
+ WriteRequest::ProduceLog(req) => send!(req),
+ WriteRequest::PutKv(req) => send!(req),
+ }
+ }
+
+ async fn handle_write_response<R: WriteResponse>(
&self,
table_id: i64,
request_buckets: &[TableBucket],
records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
- response: ProduceLogResponse,
+ response: R,
) -> Result<()> {
let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
let mut pending_buckets: HashSet<TableBucket> =
request_buckets.iter().cloned().collect();
- for produce_log_response_for_bucket in response.buckets_resp.iter() {
- let tb = TableBucket::new(table_id,
produce_log_response_for_bucket.bucket_id);
+ for bucket_resp in response.buckets_resp() {
+ let tb = TableBucket::new(table_id, bucket_resp.bucket_id());
let Some(ready_batch) = records_by_bucket.remove(&tb) else {
panic!("Missing ready batch for table bucket {tb}");
};
pending_buckets.remove(&tb);
- if let Some(error_code) =
produce_log_response_for_bucket.error_code {
- if error_code == FlussError::None.code() {
- self.complete_batch(ready_batch);
- continue;
- }
-
- let error = FlussError::for_code(error_code);
- let message = produce_log_response_for_bucket
- .error_message
- .clone()
- .unwrap_or_else(|| error.message().to_string());
- if let Some(table_path) = self
- .handle_write_batch_error(ready_batch, error, message)
- .await?
- {
- invalid_metadata_tables.insert(table_path);
- }
- } else {
- self.complete_batch(ready_batch)
- }
- }
- if !pending_buckets.is_empty() {
- for bucket in pending_buckets {
- if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
- let message =
- format!("Missing response for table bucket {bucket} in
produce response.");
- let error = FlussError::UnknownServerError;
+ match bucket_resp.error_code() {
+ Some(code) if code != FlussError::None.code() => {
+ let error = FlussError::for_code(code);
+ let message = bucket_resp
+ .error_message()
+ .cloned()
+ .unwrap_or_else(|| error.message().to_string());
if let Some(table_path) = self
.handle_write_batch_error(ready_batch, error, message)
.await?
@@ -290,8 +349,25 @@ impl Sender {
invalid_metadata_tables.insert(table_path);
}
}
+ _ => self.complete_batch(ready_batch),
}
}
+
+ for bucket in pending_buckets {
+ if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
+ if let Some(table_path) = self
+ .handle_write_batch_error(
+ ready_batch,
+ FlussError::UnknownServerError,
+ format!("Missing response for table bucket {bucket}"),
+ )
+ .await?
+ {
+ invalid_metadata_tables.insert(table_path);
+ }
+ }
+ }
+
self.update_metadata_if_needed(invalid_metadata_tables)
.await;
Ok(())
@@ -450,6 +526,60 @@ impl Sender {
}
}
+enum WriteRequest {
+ ProduceLog(ProduceLogRequest),
+ PutKv(PutKvRequest),
+}
+
+trait BucketResponse {
+ fn bucket_id(&self) -> i32;
+ fn error_code(&self) -> Option<i32>;
+ fn error_message(&self) -> Option<&String>;
+}
+
+impl BucketResponse for PbProduceLogRespForBucket {
+ fn bucket_id(&self) -> i32 {
+ self.bucket_id
+ }
+ fn error_code(&self) -> Option<i32> {
+ self.error_code
+ }
+ fn error_message(&self) -> Option<&String> {
+ self.error_message.as_ref()
+ }
+}
+
+impl BucketResponse for PbPutKvRespForBucket {
+ fn bucket_id(&self) -> i32 {
+ self.bucket_id
+ }
+ fn error_code(&self) -> Option<i32> {
+ self.error_code
+ }
+ fn error_message(&self) -> Option<&String> {
+ self.error_message.as_ref()
+ }
+}
+
+trait WriteResponse {
+ type BucketResp: BucketResponse;
+ fn buckets_resp(&self) -> &[Self::BucketResp];
+}
+
+impl WriteResponse for ProduceLogResponse {
+ type BucketResp = PbProduceLogRespForBucket;
+ fn buckets_resp(&self) -> &[Self::BucketResp] {
+ &self.buckets_resp
+ }
+}
+
+impl WriteResponse for PutKvResponse {
+ type BucketResp = PbPutKvRespForBucket;
+ fn buckets_resp(&self) -> &[Self::BucketResp] {
+ &self.buckets_resp
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -563,7 +693,7 @@ mod tests {
};
sender
- .handle_produce_response(1, &request_buckets, &mut
records_by_bucket, response)
+ .handle_write_response(1, &request_buckets, &mut
records_by_bucket, response)
.await?;
let batch_result = handle.wait().await?;
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 4e6c8e1..881a64f 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -57,6 +57,7 @@ pub use list_offsets::*;
pub use list_tables::*;
pub use lookup::*;
pub use produce_log::*;
+pub use put_kv::*;
pub use table_exists::*;
pub use update_metadata::*;