This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 86c7fe5  feat: introduce KvWriteBatch and PutKvRequest (#176)
86c7fe5 is described below

commit 86c7fe5522141af88066ba2555bdc75d3be7d297
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Jan 18 22:09:25 2026 +0800

    feat: introduce KvWriteBatch and PutKvRequest (#176)
---
 bindings/python/src/table.rs                       |  11 +-
 crates/fluss/build.rs                              |   5 +-
 crates/fluss/src/client/table/append.rs            |  11 +-
 crates/fluss/src/client/table/log_fetch_buffer.rs  |   2 +-
 crates/fluss/src/client/table/scanner.rs           |   3 +-
 crates/fluss/src/client/table/writer.rs            |   5 +-
 crates/fluss/src/client/write/accumulator.rs       |   7 +-
 crates/fluss/src/client/write/batch.rs             | 133 ++++++++++++++++++---
 crates/fluss/src/client/write/mod.rs               |  79 ++++++++++--
 crates/fluss/src/client/write/sender.rs            |  15 ++-
 crates/fluss/src/proto/fluss_api.proto             |  28 +++++
 crates/fluss/src/record/arrow.rs                   |  17 ++-
 .../fluss/src/record/kv/kv_record_batch_builder.rs |  25 ++--
 .../fluss/src/record/kv/kv_record_read_context.rs  |   5 +-
 crates/fluss/src/rpc/api_key.rs                    |   4 +
 crates/fluss/src/rpc/message/mod.rs                |   1 +
 crates/fluss/src/rpc/message/produce_log.rs        |   2 +-
 crates/fluss/src/rpc/message/put_kv.rs             |  73 +++++++++++
 18 files changed, 360 insertions(+), 66 deletions(-)

diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index db85c51..773354e 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -340,8 +340,7 @@ fn python_to_generic_row(
             .map(|n| n.to_string())
             .unwrap_or_else(|_| "unknown".to_string());
         FlussError::new_err(format!(
-            "Row must be a dict, list, or tuple; got {}",
-            type_name
+            "Row must be a dict, list, or tuple; got {type_name}"
         ))
     })?;
     let schema = table_info.row_type();
@@ -357,7 +356,7 @@ fn python_to_generic_row(
                         .name()
                         .map(|n| n.to_string())
                         .unwrap_or_else(|_| "unknown".to_string());
-                    FlussError::new_err(format!("Row dict keys must be 
strings; got {}", key_type))
+                    FlussError::new_err(format!("Row dict keys must be 
strings; got {key_type}"))
                 })?;
 
                 if fields.iter().all(|f| f.name() != key_str) {
@@ -367,8 +366,7 @@ fn python_to_generic_row(
                         .collect::<Vec<_>>()
                         .join(", ");
                     return Err(FlussError::new_err(format!(
-                        "Unknown field '{}'. Expected fields: {}",
-                        key_str, expected
+                        "Unknown field '{key_str}'. Expected fields: 
{expected}"
                     )));
                 }
             }
@@ -476,8 +474,7 @@ fn python_value_to_datum(
             }
         }
         _ => Err(FlussError::new_err(format!(
-            "Unsupported data type for row-level operations: {:?}",
-            data_type
+            "Unsupported data type for row-level operations: {data_type}"
         ))),
     }
 }
diff --git a/crates/fluss/build.rs b/crates/fluss/build.rs
index 1564313..265208a 100644
--- a/crates/fluss/build.rs
+++ b/crates/fluss/build.rs
@@ -19,7 +19,10 @@ use std::io::Result;
 
 fn main() -> Result<()> {
     let mut config = prost_build::Config::new();
-    config.bytes([".proto.PbProduceLogReqForBucket.records"]);
+    config.bytes([
+        ".proto.PbProduceLogReqForBucket.records",
+        ".proto.PbPutKvReqForBucket.records",
+    ]);
     config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?;
     Ok(())
 }
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index ad3e55e..6d76f28 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -46,6 +46,7 @@ impl TableAppend {
         AppendWriter {
             table_path: Arc::new(self.table_path.clone()),
             writer_client: self.writer_client.clone(),
+            table_info: Arc::new(self.table_info.clone()),
         }
     }
 }
@@ -53,18 +54,24 @@ impl TableAppend {
 pub struct AppendWriter {
     table_path: Arc<TablePath>,
     writer_client: Arc<WriterClient>,
+    table_info: Arc<TableInfo>,
 }
 
 impl AppendWriter {
     pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
-        let record = WriteRecord::new(self.table_path.clone(), row);
+        let record =
+            WriteRecord::for_append(self.table_path.clone(), 
self.table_info.schema_id, row);
         let result_handle = self.writer_client.send(&record).await?;
         let result = result_handle.wait().await?;
         result_handle.result(result)
     }
 
     pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
-        let record = WriteRecord::new_record_batch(self.table_path.clone(), 
batch);
+        let record = WriteRecord::for_append_record_batch(
+            self.table_path.clone(),
+            self.table_info.schema_id,
+            batch,
+        );
         let result_handle = self.writer_client.send(&record).await?;
         let result = result_handle.wait().await?;
         result_handle.result(result)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index fb6981f..ac44cc1 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -733,7 +733,7 @@ mod tests {
         let mut row = GenericRow::new();
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
-        let record = WriteRecord::new(table_path, row);
+        let record = WriteRecord::for_append(table_path, 1, row);
         builder.append(&record)?;
 
         let data = builder.build()?;
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 3e7d61f..e9b2ce1 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -1446,8 +1446,9 @@ mod tests {
                 compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
             },
         );
-        let record = WriteRecord::new(
+        let record = WriteRecord::for_append(
             table_path,
+            1,
             GenericRow {
                 values: vec![Datum::Int32(1)],
             },
diff --git a/crates/fluss/src/client/table/writer.rs 
b/crates/fluss/src/client/table/writer.rs
index b2ba881..8a83b5e 100644
--- a/crates/fluss/src/client/table/writer.rs
+++ b/crates/fluss/src/client/table/writer.rs
@@ -43,6 +43,7 @@ pub struct AbstractTableWriter {
     table_path: Arc<TablePath>,
     writer_client: Arc<WriterClient>,
     field_count: i32,
+    schema_id: i32,
 }
 
 #[allow(dead_code)]
@@ -57,6 +58,7 @@ impl AbstractTableWriter {
             table_path: Arc::new(table_path),
             writer_client,
             field_count: table_info.row_type().fields().len() as i32,
+            schema_id: table_info.schema_id,
         }
     }
 
@@ -82,7 +84,8 @@ pub struct AppendWriterImpl {
 #[allow(dead_code)]
 impl AppendWriterImpl {
     pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
-        let record = WriteRecord::new(self.base.table_path.clone(), row);
+        let record =
+            WriteRecord::for_append(self.base.table_path.clone(), 
self.base.schema_id, row);
         self.base.send(&record).await
     }
 }
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 83f11ab..0afc9d4 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -17,7 +17,7 @@
 
 use crate::client::write::batch::WriteBatch::ArrowLog;
 use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
-use crate::client::{Record, ResultHandle, WriteRecord};
+use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
 use crate::cluster::{BucketLocation, Cluster, ServerNode};
 use crate::config::Config;
 use crate::error::Result;
@@ -110,7 +110,7 @@ impl RecordAccumulator {
             row_type,
             bucket_id,
             current_time_ms(),
-            matches!(record.row, Record::RecordBatch(_)),
+            matches!(&record.record, 
Record::Log(LogWriteRecord::RecordBatch(_))),
         ));
 
         let batch_id = batch.batch_id();
@@ -541,8 +541,9 @@ mod tests {
         let accumulator = RecordAccumulator::new(config);
         let table_path = Arc::new(TablePath::new("db".to_string(), 
"tbl".to_string()));
         let cluster = Arc::new(build_cluster(table_path.as_ref(), 1, 1));
-        let record = WriteRecord::new(
+        let record = WriteRecord::for_append(
             table_path.clone(),
+            1,
             GenericRow {
                 values: vec![Datum::Int32(1)],
             },
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 1f54226..0159753 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -17,13 +17,13 @@
 
 use crate::BucketId;
 use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
-use crate::client::{ResultHandle, WriteRecord};
+use crate::client::{Record, ResultHandle, WriteRecord};
 use crate::compression::ArrowCompressionInfo;
-use crate::error::Result;
-use crate::metadata::{DataType, TablePath};
+use crate::error::{Error, Result};
+use crate::metadata::{DataType, KvFormat, TablePath};
 use crate::record::MemoryLogRecordsArrowBuilder;
+use crate::record::kv::KvRecordBatchBuilder;
 use bytes::Bytes;
-use parking_lot::Mutex;
 use std::cmp::max;
 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 
@@ -92,18 +92,28 @@ impl InnerWriteBatch {
 
 pub enum WriteBatch {
     ArrowLog(ArrowLogWriteBatch),
+    Kv(KvWriteBatch),
 }
 
 impl WriteBatch {
     pub fn inner_batch(&self) -> &InnerWriteBatch {
         match self {
             WriteBatch::ArrowLog(batch) => &batch.write_batch,
+            WriteBatch::Kv(batch) => &batch.write_batch,
+        }
+    }
+
+    pub fn inner_batch_mut(&mut self) -> &mut InnerWriteBatch {
+        match self {
+            WriteBatch::ArrowLog(batch) => &mut batch.write_batch,
+            WriteBatch::Kv(batch) => &mut batch.write_batch,
         }
     }
 
     pub fn try_append(&mut self, write_record: &WriteRecord) -> 
Result<Option<ResultHandle>> {
         match self {
             WriteBatch::ArrowLog(batch) => batch.try_append(write_record),
+            WriteBatch::Kv(batch) => batch.try_append(write_record),
         }
     }
 
@@ -111,11 +121,13 @@ impl WriteBatch {
         self.inner_batch().waited_time_ms(now)
     }
 
-    pub fn close(&mut self) {
+    pub fn close(&mut self) -> Result<()> {
         match self {
             WriteBatch::ArrowLog(batch) => {
                 batch.close();
+                Ok(())
             }
+            WriteBatch::Kv(batch) => batch.close(),
         }
     }
 
@@ -127,20 +139,18 @@ impl WriteBatch {
     pub fn is_closed(&self) -> bool {
         match self {
             WriteBatch::ArrowLog(batch) => batch.is_closed(),
+            WriteBatch::Kv(batch) => batch.is_closed(),
         }
     }
 
     pub fn drained(&mut self, now_ms: i64) {
-        match self {
-            WriteBatch::ArrowLog(batch) => {
-                batch.write_batch.drained(now_ms);
-            }
-        }
+        self.inner_batch_mut().drained(now_ms);
     }
 
-    pub fn build(&self) -> Result<Bytes> {
+    pub fn build(&mut self) -> Result<Bytes> {
         match self {
             WriteBatch::ArrowLog(batch) => batch.build(),
+            WriteBatch::Kv(batch) => batch.build(),
         }
     }
 
@@ -172,7 +182,7 @@ impl WriteBatch {
 pub struct ArrowLogWriteBatch {
     pub write_batch: InnerWriteBatch,
     pub arrow_builder: MemoryLogRecordsArrowBuilder,
-    built_records: Mutex<Option<Bytes>>,
+    built_records: Option<Bytes>,
 }
 
 impl ArrowLogWriteBatch {
@@ -196,7 +206,7 @@ impl ArrowLogWriteBatch {
                 to_append_record_batch,
                 arrow_compression_info,
             ),
-            built_records: Mutex::new(None),
+            built_records: None,
         }
     }
 
@@ -218,13 +228,12 @@ impl ArrowLogWriteBatch {
         }
     }
 
-    pub fn build(&self) -> Result<Bytes> {
-        let mut cached = self.built_records.lock();
-        if let Some(bytes) = cached.as_ref() {
+    pub fn build(&mut self) -> Result<Bytes> {
+        if let Some(bytes) = &self.built_records {
             return Ok(bytes.clone());
         }
         let bytes = Bytes::from(self.arrow_builder.build()?);
-        *cached = Some(bytes.clone());
+        self.built_records = Some(bytes.clone());
         Ok(bytes)
     }
 
@@ -237,6 +246,96 @@ impl ArrowLogWriteBatch {
     }
 }
 
+pub struct KvWriteBatch {
+    write_batch: InnerWriteBatch,
+    kv_batch_builder: KvRecordBatchBuilder,
+    target_columns: Option<Vec<usize>>,
+    schema_id: i32,
+}
+
+impl KvWriteBatch {
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        batch_id: i64,
+        table_path: TablePath,
+        schema_id: i32,
+        write_limit: usize,
+        kv_format: KvFormat,
+        bucket_id: BucketId,
+        target_columns: Option<Vec<usize>>,
+        create_ms: i64,
+    ) -> Self {
+        let base = InnerWriteBatch::new(batch_id, table_path, create_ms, 
bucket_id);
+        Self {
+            write_batch: base,
+            kv_batch_builder: KvRecordBatchBuilder::new(schema_id, 
write_limit, kv_format),
+            target_columns,
+            schema_id,
+        }
+    }
+
+    pub fn try_append(&mut self, write_record: &WriteRecord) -> 
Result<Option<ResultHandle>> {
+        let kv_write_record = match &write_record.record {
+            Record::Kv(record) => record,
+            _ => {
+                return Err(Error::UnsupportedOperation {
+                    message: "Only KvRecord to append to KvWriteBatch 
".to_string(),
+                });
+            }
+        };
+
+        let key = kv_write_record.key;
+
+        if self.schema_id != write_record.schema_id {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "schema id {} of the write record to append is not the 
same as the current schema id {} in the batch.",
+                    write_record.schema_id, self.schema_id
+                ),
+                source: None,
+            });
+        };
+
+        if self.target_columns.as_deref() != kv_write_record.target_columns {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "target columns {:?} of the write record to append are not 
the same as the current target columns {:?} in the batch.",
+                    kv_write_record.target_columns,
+                    self.target_columns.as_deref()
+                ),
+                source: None,
+            });
+        }
+
+        let row = kv_write_record.compacted_row.as_ref();
+
+        if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, 
row) {
+            Ok(None)
+        } else {
+            // append successfully
+            self.kv_batch_builder
+                .append_row(key, row)
+                .map_err(|e| Error::UnexpectedError {
+                    message: "Failed to append row to 
KvWriteBatch".to_string(),
+                    source: Some(Box::new(e)),
+                })?;
+            Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
+        }
+    }
+
+    pub fn build(&mut self) -> Result<Bytes> {
+        self.kv_batch_builder.build()
+    }
+
+    pub fn is_closed(&self) -> bool {
+        self.kv_batch_builder.is_closed()
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.kv_batch_builder.close()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index 00a71c5..248218e 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -21,7 +21,7 @@ mod batch;
 use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, 
BroadcastOnceReceiver};
 use crate::error::Error;
 use crate::metadata::TablePath;
-use crate::row::GenericRow;
+use crate::row::{CompactedRow, GenericRow};
 pub use accumulator::*;
 use arrow::array::RecordBatch;
 use std::sync::Arc;
@@ -36,28 +36,91 @@ mod writer_client;
 pub use write_format::WriteFormat;
 pub use writer_client::WriterClient;
 
+#[allow(dead_code)]
 pub struct WriteRecord<'a> {
-    pub row: Record<'a>,
-    pub table_path: Arc<TablePath>,
+    record: Record<'a>,
+    table_path: Arc<TablePath>,
+    bucket_key: Option<&'a [u8]>,
+    schema_id: i32,
+    write_format: WriteFormat,
+}
+
+impl<'a> WriteRecord<'a> {
+    pub fn record(&self) -> &Record<'a> {
+        &self.record
+    }
 }
 
 pub enum Record<'a> {
-    Row(GenericRow<'a>),
+    Log(LogWriteRecord<'a>),
+    Kv(KvWriteRecord<'a>),
+}
+
+pub enum LogWriteRecord<'a> {
+    Generic(GenericRow<'a>),
     RecordBatch(Arc<RecordBatch>),
 }
 
+pub struct KvWriteRecord<'a> {
+    // only valid for primary key table
+    key: &'a [u8],
+    target_columns: Option<&'a [usize]>,
+    compacted_row: Option<CompactedRow<'a>>,
+}
+
+impl<'a> KvWriteRecord<'a> {
+    fn new(
+        key: &'a [u8],
+        target_columns: Option<&'a [usize]>,
+        compacted_row: Option<CompactedRow<'a>>,
+    ) -> Self {
+        KvWriteRecord {
+            key,
+            target_columns,
+            compacted_row,
+        }
+    }
+}
+
 impl<'a> WriteRecord<'a> {
-    pub fn new(table_path: Arc<TablePath>, row: GenericRow<'a>) -> Self {
+    pub fn for_append(table_path: Arc<TablePath>, schema_id: i32, row: 
GenericRow<'a>) -> Self {
+        Self {
+            record: Record::Log(LogWriteRecord::Generic(row)),
+            table_path,
+            bucket_key: None,
+            schema_id,
+            write_format: WriteFormat::ArrowLog,
+        }
+    }
+
+    pub fn for_append_record_batch(
+        table_path: Arc<TablePath>,
+        schema_id: i32,
+        row: RecordBatch,
+    ) -> Self {
         Self {
-            row: Record::Row(row),
+            record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))),
             table_path,
+            bucket_key: None,
+            schema_id,
+            write_format: WriteFormat::ArrowLog,
         }
     }
 
-    pub fn new_record_batch(table_path: Arc<TablePath>, row: RecordBatch) -> 
Self {
+    pub fn for_upsert(
+        table_path: Arc<TablePath>,
+        schema_id: i32,
+        bucket_key: &'a [u8],
+        key: &'a [u8],
+        target_columns: Option<&'a [usize]>,
+        row: CompactedRow<'a>,
+    ) -> Self {
         Self {
-            row: Record::RecordBatch(Arc::new(row)),
+            record: Record::Kv(KvWriteRecord::new(key, target_columns, 
Some(row))),
             table_path,
+            bucket_key: Some(bucket_key),
+            schema_id,
+            write_format: WriteFormat::CompactedKv,
         }
     }
 }
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index ffac0af..7ea24e3 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -178,9 +178,9 @@ impl Sender {
         };
 
         for (table_id, table_buckets) in write_batch_by_table {
-            let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+            let mut request_batches: Vec<ReadyWriteBatch> = table_buckets
                 .iter()
-                .filter_map(|bucket| records_by_bucket.get(bucket))
+                .filter_map(|bucket| records_by_bucket.remove(bucket))
                 .collect();
             if request_batches.is_empty() {
                 continue;
@@ -189,7 +189,7 @@ impl Sender {
                 table_id,
                 acks,
                 self.max_request_timeout_ms,
-                request_batches.as_slice(),
+                &mut request_batches,
             ) {
                 Ok(request) => request,
                 Err(e) => {
@@ -205,6 +205,12 @@ impl Sender {
                 }
             };
 
+            // let's put in back into records_by_bucket
+            // since response handle will use it.
+            for request_batch in request_batches {
+                records_by_bucket.insert(request_batch.table_bucket.clone(), 
request_batch);
+            }
+
             let response = match connection.request(request).await {
                 Ok(response) => response,
                 Err(e) => {
@@ -462,8 +468,9 @@ mod tests {
         cluster: Arc<Cluster>,
         table_path: Arc<TablePath>,
     ) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> {
-        let record = WriteRecord::new(
+        let record = WriteRecord::for_append(
             table_path,
+            1,
             GenericRow {
                 values: vec![Datum::Int32(1)],
             },
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index b4ae840..eaee94c 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -119,6 +119,34 @@ message PbProduceLogRespForBucket {
   optional int64 base_offset = 5;
 }
 
+// put kv request and response
+message PutKvRequest {
+  required int32 acks = 1;
+  required int64 table_id = 2;
+  required int32 timeout_ms = 3;
+  // the indexes for the columns to write,
+  // if empty, means write all columns
+  repeated int32 target_columns = 4 [packed = true];
+  repeated PbPutKvReqForBucket buckets_req = 5;
+}
+
+message PutKvResponse {
+  repeated PbPutKvRespForBucket buckets_resp = 1;
+}
+
+message PbPutKvReqForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  required bytes records = 3;
+}
+
+message PbPutKvRespForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  optional int32 error_code = 3;
+  optional string error_message = 4;
+}
+
 message CreateTableRequest {
   required PbTablePath table_path = 1;
   required bytes table_json = 2;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index b331ae9..aa48376 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::{Record, WriteRecord};
+use crate::client::{LogWriteRecord, Record, WriteRecord};
 use crate::compression::ArrowCompressionInfo;
 use crate::error::{Error, Result};
 use crate::metadata::DataType;
@@ -275,11 +275,16 @@ impl MemoryLogRecordsArrowBuilder {
     }
 
     pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
-        match &record.row {
-            Record::Row(row) => 
Ok(self.arrow_record_batch_builder.append(row)?),
-            Record::RecordBatch(record_batch) => Ok(self
-                .arrow_record_batch_builder
-                .append_batch(record_batch.clone())?),
+        match &record.record() {
+            Record::Log(log_write_record) => match log_write_record {
+                LogWriteRecord::Generic(row) => 
Ok(self.arrow_record_batch_builder.append(row)?),
+                LogWriteRecord::RecordBatch(record_batch) => Ok(self
+                    .arrow_record_batch_builder
+                    .append_batch(record_batch.clone())?),
+            },
+            Record::Kv(_) => Err(Error::UnsupportedOperation {
+                message: "Only LogRecord is supported to append".to_string(),
+            }),
         }
         // todo: consider write other change type
     }
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 636104d..e3da864 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -19,9 +19,7 @@
 //!
 //! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
 
-use bytes::{Bytes, BytesMut};
-use std::io;
-
+use crate::error::{Error, Result};
 use crate::metadata::KvFormat;
 use crate::record::kv::kv_record::KvRecord;
 use crate::record::kv::kv_record_batch::{
@@ -31,6 +29,8 @@ use crate::record::kv::kv_record_batch::{
 };
 use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
 use crate::row::BinaryRow;
+use bytes::{Bytes, BytesMut};
+use std::io;
 
 /// Builder for KvRecordBatch.
 ///
@@ -185,11 +185,12 @@ impl KvRecordBatchBuilder {
     /// built bytes may change if mutations occur between builds.
     ///
     /// Note: [`close`](Self::close) prevents further appends but does not 
prevent writer state modifications.
-    pub fn build(&mut self) -> io::Result<Bytes> {
+    pub fn build(&mut self) -> Result<Bytes> {
         if self.aborted {
-            return Err(io::Error::other(
-                "Attempting to build an aborted record batch",
-            ));
+            return Err(Error::UnexpectedError {
+                message: "Attempting to build an aborted record 
batch".to_string(),
+                source: None,
+            });
         }
 
         if let Some(ref cached) = self.built_buffer {
@@ -225,11 +226,13 @@ impl KvRecordBatchBuilder {
 
     /// Close the builder.
     /// After closing, no more records can be appended, but the batch can 
still be built.
-    pub fn close(&mut self) -> io::Result<()> {
+    pub fn close(&mut self) -> Result<()> {
         if self.aborted {
-            return Err(io::Error::other(
-                "Cannot close KvRecordBatchBuilder as it has already been 
aborted",
-            ));
+            return Err(Error::UnexpectedError {
+                message: "Cannot close KvRecordBatchBuilder as it has already 
been aborted"
+                    .to_string(),
+                source: None,
+            });
         }
         self.is_closed = true;
         Ok(())
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs 
b/crates/fluss/src/record/kv/kv_record_read_context.rs
index 2049c32..fe6c6f0 100644
--- a/crates/fluss/src/record/kv/kv_record_read_context.rs
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -90,8 +90,7 @@ impl ReadContext for KvRecordReadContext {
             other => {
                 return Err(Error::IoUnexpectedError {
                     message: format!(
-                        "Schema {} has invalid row type: expected Row, got 
{:?}",
-                        schema_id, other
+                        "Schema {schema_id} has invalid row type: expected 
Row, got {other:?}"
                     ),
                     source: std::io::Error::new(
                         std::io::ErrorKind::InvalidData,
@@ -134,7 +133,7 @@ mod tests {
         fn new(data_types: Vec<crate::metadata::DataType>) -> Self {
             let mut builder = Schema::builder();
             for (i, dt) in data_types.iter().enumerate() {
-                builder = builder.column(&format!("field{}", i), dt.clone());
+                builder = builder.column(&format!("field{i}"), dt.clone());
             }
             let schema = builder.build().expect("Failed to build schema");
 
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 9f9268e..66e4beb 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -30,6 +30,7 @@ pub enum ApiKey {
     TableExists,
     MetaData,
     ProduceLog,
+    PutKv,
     FetchLog,
     Lookup,
     ListOffsets,
@@ -54,6 +55,7 @@ impl From<i16> for ApiKey {
             1012 => ApiKey::MetaData,
             1014 => ApiKey::ProduceLog,
             1015 => ApiKey::FetchLog,
+            1016 => ApiKey::PutKv,
             1017 => ApiKey::Lookup,
             1021 => ApiKey::ListOffsets,
             1025 => ApiKey::GetFileSystemSecurityToken,
@@ -79,6 +81,7 @@ impl From<ApiKey> for i16 {
             ApiKey::MetaData => 1012,
             ApiKey::ProduceLog => 1014,
             ApiKey::FetchLog => 1015,
+            ApiKey::PutKv => 1016,
             ApiKey::Lookup => 1017,
             ApiKey::ListOffsets => 1021,
             ApiKey::GetFileSystemSecurityToken => 1025,
@@ -108,6 +111,7 @@ mod tests {
             (1012, ApiKey::MetaData),
             (1014, ApiKey::ProduceLog),
             (1015, ApiKey::FetchLog),
+            (1016, ApiKey::PutKv),
             (1017, ApiKey::Lookup),
             (1021, ApiKey::ListOffsets),
             (1025, ApiKey::GetFileSystemSecurityToken),
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 2fe506b..4e6c8e1 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -36,6 +36,7 @@ mod list_offsets;
 mod list_tables;
 mod lookup;
 mod produce_log;
+mod put_kv;
 mod table_exists;
 mod update_metadata;
 
diff --git a/crates/fluss/src/rpc/message/produce_log.rs 
b/crates/fluss/src/rpc/message/produce_log.rs
index eb72575..dab7ea9 100644
--- a/crates/fluss/src/rpc/message/produce_log.rs
+++ b/crates/fluss/src/rpc/message/produce_log.rs
@@ -37,7 +37,7 @@ impl ProduceLogRequest {
         table_id: i64,
         ack: i16,
         max_request_timeout_ms: i32,
-        ready_batches: &[&ReadyWriteBatch],
+        ready_batches: &mut [ReadyWriteBatch],
     ) -> FlussResult<Self> {
         let mut request = proto::ProduceLogRequest {
             table_id,
diff --git a/crates/fluss/src/rpc/message/put_kv.rs 
b/crates/fluss/src/rpc/message/put_kv.rs
new file mode 100644
index 0000000..983faa6
--- /dev/null
+++ b/crates/fluss/src/rpc/message/put_kv.rs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::client::ReadyWriteBatch;
+use crate::proto::{PbPutKvReqForBucket, PutKvResponse};
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::ReadError;
+use crate::rpc::frame::WriteError;
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[allow(dead_code)]
+pub struct PutKvRequest {
+    pub inner_request: proto::PutKvRequest,
+}
+
+#[allow(dead_code)]
+impl PutKvRequest {
+    pub fn new(
+        table_id: i64,
+        ack: i16,
+        max_request_timeout_ms: i32,
+        target_columns: Vec<i32>,
+        ready_batches: &mut [ReadyWriteBatch],
+    ) -> crate::error::Result<Self> {
+        let mut request = proto::PutKvRequest {
+            table_id,
+            acks: ack as i32,
+            timeout_ms: max_request_timeout_ms,
+            target_columns,
+            ..Default::default()
+        };
+        for ready_batch in ready_batches {
+            request.buckets_req.push(PbPutKvReqForBucket {
+                partition_id: ready_batch.table_bucket.partition_id(),
+                bucket_id: ready_batch.table_bucket.bucket_id(),
+                records: ready_batch.write_batch.build()?,
+            })
+        }
+
+        Ok(PutKvRequest {
+            inner_request: request,
+        })
+    }
+}
+
+impl RequestBody for PutKvRequest {
+    type ResponseBody = PutKvResponse;
+
+    const API_KEY: ApiKey = ApiKey::PutKv;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(PutKvRequest);
+impl_read_version_type!(PutKvResponse);

Reply via email to