This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b4fad4b  feat: KvWriteBatch wiring in Sender (#184)
b4fad4b is described below

commit b4fad4bea896d0e5ee1df4b2ed0a751dda46728f
Author: Keith Lee <[email protected]>
AuthorDate: Wed Jan 21 01:50:55 2026 +0000

    feat: KvWriteBatch wiring in Sender (#184)
---
 crates/examples/src/example_kv_table.rs |   4 +-
 crates/fluss/src/client/write/batch.rs  |   4 +
 crates/fluss/src/client/write/sender.rs | 248 ++++++++++++++++++++++++--------
 crates/fluss/src/rpc/message/mod.rs     |   1 +
 4 files changed, 197 insertions(+), 60 deletions(-)

diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 75821a3..dcf7db8 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -69,7 +69,7 @@ pub async fn main() -> Result<()> {
     println!("\n=== Looking up ===");
     let mut lookuper = table.new_lookup()?.create_lookuper()?;
 
-    for id in 1..=2 {
+    for id in 1..=3 {
         let result = lookuper.lookup(&make_key(id)).await?;
         let row = result.get_single_row()?.unwrap();
         println!(
@@ -98,6 +98,8 @@ pub async fn main() -> Result<()> {
     println!("\n=== Deleting ===");
     let mut row = GenericRow::new();
     row.set_field(0, 2);
+    row.set_field(1, "");
+    row.set_field(2, 0i64);
     upsert_writer.delete(&row).await?;
     println!("Deleted: {row:?}");
 
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 2ddf519..159e313 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -336,6 +336,10 @@ impl KvWriteBatch {
     pub fn close(&mut self) -> Result<()> {
         self.kv_batch_builder.close()
     }
+
+    pub fn target_columns(&self) -> Option<&Arc<Vec<usize>>> {
+        self.target_columns.as_ref()
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index 7ea24e3..ceed245 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -17,11 +17,16 @@
 
 use crate::client::broadcast;
 use crate::client::metadata::Metadata;
+use crate::client::write::batch::WriteBatch;
 use crate::client::{ReadyWriteBatch, RecordAccumulator};
+use crate::error::Error::UnexpectedError;
 use crate::error::{FlussError, Result};
 use crate::metadata::{TableBucket, TablePath};
-use crate::proto::ProduceLogResponse;
-use crate::rpc::message::ProduceLogRequest;
+use crate::proto::{
+    PbProduceLogRespForBucket, PbPutKvRespForBucket, ProduceLogResponse, 
PutKvResponse,
+};
+use crate::rpc::ServerConnection;
+use crate::rpc::message::{ProduceLogRequest, PutKvRequest};
 use log::warn;
 use parking_lot::Mutex;
 use std::collections::{HashMap, HashSet};
@@ -182,23 +187,22 @@ impl Sender {
                 .iter()
                 .filter_map(|bucket| records_by_bucket.remove(bucket))
                 .collect();
+
             if request_batches.is_empty() {
                 continue;
             }
-            let request = match ProduceLogRequest::new(
+
+            let write_request = match Self::build_write_request(
                 table_id,
                 acks,
                 self.max_request_timeout_ms,
                 &mut request_batches,
             ) {
-                Ok(request) => request,
+                Ok(req) => req,
                 Err(e) => {
                     self.handle_batches_with_local_error(
-                        table_buckets
-                            .iter()
-                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
-                            .collect(),
-                        format!("Failed to build produce request: {e}"),
+                        request_batches,
+                        format!("Failed to build write request: {e}"),
                     )
                     .await?;
                     continue;
@@ -211,27 +215,12 @@ impl Sender {
                 records_by_bucket.insert(request_batch.table_bucket.clone(), 
request_batch);
             }
 
-            let response = match connection.request(request).await {
-                Ok(response) => response,
-                Err(e) => {
-                    self.handle_batches_with_error(
-                        table_buckets
-                            .iter()
-                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
-                            .collect(),
-                        FlussError::NetworkException,
-                        format!("Failed to send produce request: {e}"),
-                    )
-                    .await?;
-                    continue;
-                }
-            };
-
-            self.handle_produce_response(
+            self.send_and_handle_response(
+                &connection,
+                write_request,
                 table_id,
                 &table_buckets,
                 &mut records_by_bucket,
-                response,
             )
             .await?;
         }
@@ -239,50 +228,120 @@ impl Sender {
         Ok(())
     }
 
-    async fn handle_produce_response(
+    fn build_write_request(
+        table_id: i64,
+        acks: i16,
+        timeout_ms: i32,
+        request_batches: &mut [ReadyWriteBatch],
+    ) -> Result<WriteRequest> {
+        let first_batch = &request_batches.first().unwrap().write_batch;
+
+        let request = match first_batch {
+            WriteBatch::ArrowLog(_) => {
+                let req = ProduceLogRequest::new(table_id, acks, timeout_ms, 
request_batches)?;
+                WriteRequest::ProduceLog(req)
+            }
+            WriteBatch::Kv(kv_write_batch) => {
+                let target_columns = kv_write_batch.target_columns();
+                for batch in request_batches.iter().skip(1) {
+                    match &batch.write_batch {
+                        WriteBatch::ArrowLog(_) => {
+                            return Err(UnexpectedError {
+                                message: "Expecting KvWriteBatch but found 
ArrowLogWriteBatch"
+                                    .to_string(),
+                                source: None,
+                            });
+                        }
+                        WriteBatch::Kv(kvb) => {
+                            if target_columns != kvb.target_columns() {
+                                return Err(UnexpectedError {
+                                    message: format!(
+                                        "All the write batches to make put kv 
request should have the same target columns, but got {:?} and {:?}.",
+                                        target_columns,
+                                        kvb.target_columns()
+                                    ),
+                                    source: None,
+                                });
+                            }
+                        }
+                    }
+                }
+                let cols = target_columns
+                    .map(|arc| arc.iter().map(|&c| c as i32).collect())
+                    .unwrap_or_default();
+                let req = PutKvRequest::new(table_id, acks, timeout_ms, cols, 
request_batches)?;
+                WriteRequest::PutKv(req)
+            }
+        };
+
+        Ok(request)
+    }
+
+    async fn send_and_handle_response(
+        &self,
+        connection: &ServerConnection,
+        write_request: WriteRequest,
+        table_id: i64,
+        table_buckets: &[TableBucket],
+        records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
+    ) -> Result<()> {
+        macro_rules! send {
+            ($request:expr) => {
+                match connection.request($request).await {
+                    Ok(response) => {
+                        self.handle_write_response(
+                            table_id,
+                            table_buckets,
+                            records_by_bucket,
+                            response,
+                        )
+                        .await
+                    }
+                    Err(e) => {
+                        self.handle_batches_with_error(
+                            table_buckets
+                                .iter()
+                                .filter_map(|b| records_by_bucket.remove(b))
+                                .collect(),
+                            FlussError::NetworkException,
+                            format!("Failed to send write request: {e}"),
+                        )
+                        .await
+                    }
+                }
+            };
+        }
+
+        match write_request {
+            WriteRequest::ProduceLog(req) => send!(req),
+            WriteRequest::PutKv(req) => send!(req),
+        }
+    }
+
+    async fn handle_write_response<R: WriteResponse>(
         &self,
         table_id: i64,
         request_buckets: &[TableBucket],
         records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
-        response: ProduceLogResponse,
+        response: R,
     ) -> Result<()> {
         let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
         let mut pending_buckets: HashSet<TableBucket> = 
request_buckets.iter().cloned().collect();
-        for produce_log_response_for_bucket in response.buckets_resp.iter() {
-            let tb = TableBucket::new(table_id, 
produce_log_response_for_bucket.bucket_id);
 
+        for bucket_resp in response.buckets_resp() {
+            let tb = TableBucket::new(table_id, bucket_resp.bucket_id());
             let Some(ready_batch) = records_by_bucket.remove(&tb) else {
                 panic!("Missing ready batch for table bucket {tb}");
             };
             pending_buckets.remove(&tb);
 
-            if let Some(error_code) = 
produce_log_response_for_bucket.error_code {
-                if error_code == FlussError::None.code() {
-                    self.complete_batch(ready_batch);
-                    continue;
-                }
-
-                let error = FlussError::for_code(error_code);
-                let message = produce_log_response_for_bucket
-                    .error_message
-                    .clone()
-                    .unwrap_or_else(|| error.message().to_string());
-                if let Some(table_path) = self
-                    .handle_write_batch_error(ready_batch, error, message)
-                    .await?
-                {
-                    invalid_metadata_tables.insert(table_path);
-                }
-            } else {
-                self.complete_batch(ready_batch)
-            }
-        }
-        if !pending_buckets.is_empty() {
-            for bucket in pending_buckets {
-                if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
-                    let message =
-                        format!("Missing response for table bucket {bucket} in 
produce response.");
-                    let error = FlussError::UnknownServerError;
+            match bucket_resp.error_code() {
+                Some(code) if code != FlussError::None.code() => {
+                    let error = FlussError::for_code(code);
+                    let message = bucket_resp
+                        .error_message()
+                        .cloned()
+                        .unwrap_or_else(|| error.message().to_string());
                     if let Some(table_path) = self
                         .handle_write_batch_error(ready_batch, error, message)
                         .await?
@@ -290,8 +349,25 @@ impl Sender {
                         invalid_metadata_tables.insert(table_path);
                     }
                 }
+                _ => self.complete_batch(ready_batch),
             }
         }
+
+        for bucket in pending_buckets {
+            if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
+                if let Some(table_path) = self
+                    .handle_write_batch_error(
+                        ready_batch,
+                        FlussError::UnknownServerError,
+                        format!("Missing response for table bucket {bucket}"),
+                    )
+                    .await?
+                {
+                    invalid_metadata_tables.insert(table_path);
+                }
+            }
+        }
+
         self.update_metadata_if_needed(invalid_metadata_tables)
             .await;
         Ok(())
@@ -450,6 +526,60 @@ impl Sender {
     }
 }
 
+enum WriteRequest {
+    ProduceLog(ProduceLogRequest),
+    PutKv(PutKvRequest),
+}
+
+trait BucketResponse {
+    fn bucket_id(&self) -> i32;
+    fn error_code(&self) -> Option<i32>;
+    fn error_message(&self) -> Option<&String>;
+}
+
+impl BucketResponse for PbProduceLogRespForBucket {
+    fn bucket_id(&self) -> i32 {
+        self.bucket_id
+    }
+    fn error_code(&self) -> Option<i32> {
+        self.error_code
+    }
+    fn error_message(&self) -> Option<&String> {
+        self.error_message.as_ref()
+    }
+}
+
+impl BucketResponse for PbPutKvRespForBucket {
+    fn bucket_id(&self) -> i32 {
+        self.bucket_id
+    }
+    fn error_code(&self) -> Option<i32> {
+        self.error_code
+    }
+    fn error_message(&self) -> Option<&String> {
+        self.error_message.as_ref()
+    }
+}
+
+trait WriteResponse {
+    type BucketResp: BucketResponse;
+    fn buckets_resp(&self) -> &[Self::BucketResp];
+}
+
+impl WriteResponse for ProduceLogResponse {
+    type BucketResp = PbProduceLogRespForBucket;
+    fn buckets_resp(&self) -> &[Self::BucketResp] {
+        &self.buckets_resp
+    }
+}
+
+impl WriteResponse for PutKvResponse {
+    type BucketResp = PbPutKvRespForBucket;
+    fn buckets_resp(&self) -> &[Self::BucketResp] {
+        &self.buckets_resp
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -563,7 +693,7 @@ mod tests {
         };
 
         sender
-            .handle_produce_response(1, &request_buckets, &mut 
records_by_bucket, response)
+            .handle_write_response(1, &request_buckets, &mut 
records_by_bucket, response)
             .await?;
 
         let batch_result = handle.wait().await?;
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 4e6c8e1..881a64f 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -57,6 +57,7 @@ pub use list_offsets::*;
 pub use list_tables::*;
 pub use lookup::*;
 pub use produce_log::*;
+pub use put_kv::*;
 pub use table_exists::*;
 pub use update_metadata::*;
 

Reply via email to