This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 86c7fe5 feat: introduce KvWriteBatch and PutKvRequest (#176)
86c7fe5 is described below
commit 86c7fe5522141af88066ba2555bdc75d3be7d297
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Jan 18 22:09:25 2026 +0800
feat: introduce KvWriteBatch and PutKvRequest (#176)
---
bindings/python/src/table.rs | 11 +-
crates/fluss/build.rs | 5 +-
crates/fluss/src/client/table/append.rs | 11 +-
crates/fluss/src/client/table/log_fetch_buffer.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 3 +-
crates/fluss/src/client/table/writer.rs | 5 +-
crates/fluss/src/client/write/accumulator.rs | 7 +-
crates/fluss/src/client/write/batch.rs | 133 ++++++++++++++++++---
crates/fluss/src/client/write/mod.rs | 79 ++++++++++--
crates/fluss/src/client/write/sender.rs | 15 ++-
crates/fluss/src/proto/fluss_api.proto | 28 +++++
crates/fluss/src/record/arrow.rs | 17 ++-
.../fluss/src/record/kv/kv_record_batch_builder.rs | 25 ++--
.../fluss/src/record/kv/kv_record_read_context.rs | 5 +-
crates/fluss/src/rpc/api_key.rs | 4 +
crates/fluss/src/rpc/message/mod.rs | 1 +
crates/fluss/src/rpc/message/produce_log.rs | 2 +-
crates/fluss/src/rpc/message/put_kv.rs | 73 +++++++++++
18 files changed, 360 insertions(+), 66 deletions(-)
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index db85c51..773354e 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -340,8 +340,7 @@ fn python_to_generic_row(
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!(
- "Row must be a dict, list, or tuple; got {}",
- type_name
+ "Row must be a dict, list, or tuple; got {type_name}"
))
})?;
let schema = table_info.row_type();
@@ -357,7 +356,7 @@ fn python_to_generic_row(
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!("Row dict keys must be
strings; got {}", key_type))
+ FlussError::new_err(format!("Row dict keys must be
strings; got {key_type}"))
})?;
if fields.iter().all(|f| f.name() != key_str) {
@@ -367,8 +366,7 @@ fn python_to_generic_row(
.collect::<Vec<_>>()
.join(", ");
return Err(FlussError::new_err(format!(
- "Unknown field '{}'. Expected fields: {}",
- key_str, expected
+ "Unknown field '{key_str}'. Expected fields:
{expected}"
)));
}
}
@@ -476,8 +474,7 @@ fn python_value_to_datum(
}
}
_ => Err(FlussError::new_err(format!(
- "Unsupported data type for row-level operations: {:?}",
- data_type
+ "Unsupported data type for row-level operations: {data_type}"
))),
}
}
diff --git a/crates/fluss/build.rs b/crates/fluss/build.rs
index 1564313..265208a 100644
--- a/crates/fluss/build.rs
+++ b/crates/fluss/build.rs
@@ -19,7 +19,10 @@ use std::io::Result;
fn main() -> Result<()> {
let mut config = prost_build::Config::new();
- config.bytes([".proto.PbProduceLogReqForBucket.records"]);
+ config.bytes([
+ ".proto.PbProduceLogReqForBucket.records",
+ ".proto.PbPutKvReqForBucket.records",
+ ]);
config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?;
Ok(())
}
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index ad3e55e..6d76f28 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -46,6 +46,7 @@ impl TableAppend {
AppendWriter {
table_path: Arc::new(self.table_path.clone()),
writer_client: self.writer_client.clone(),
+ table_info: Arc::new(self.table_info.clone()),
}
}
}
@@ -53,18 +54,24 @@ impl TableAppend {
pub struct AppendWriter {
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
+ table_info: Arc<TableInfo>,
}
impl AppendWriter {
pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
- let record = WriteRecord::new(self.table_path.clone(), row);
+ let record =
+ WriteRecord::for_append(self.table_path.clone(),
self.table_info.schema_id, row);
let result_handle = self.writer_client.send(&record).await?;
let result = result_handle.wait().await?;
result_handle.result(result)
}
pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
- let record = WriteRecord::new_record_batch(self.table_path.clone(),
batch);
+ let record = WriteRecord::for_append_record_batch(
+ self.table_path.clone(),
+ self.table_info.schema_id,
+ batch,
+ );
let result_handle = self.writer_client.send(&record).await?;
let result = result_handle.wait().await?;
result_handle.result(result)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index fb6981f..ac44cc1 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -733,7 +733,7 @@ mod tests {
let mut row = GenericRow::new();
row.set_field(0, 1_i32);
row.set_field(1, "alice");
- let record = WriteRecord::new(table_path, row);
+ let record = WriteRecord::for_append(table_path, 1, row);
builder.append(&record)?;
let data = builder.build()?;
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 3e7d61f..e9b2ce1 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -1446,8 +1446,9 @@ mod tests {
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
);
- let record = WriteRecord::new(
+ let record = WriteRecord::for_append(
table_path,
+ 1,
GenericRow {
values: vec![Datum::Int32(1)],
},
diff --git a/crates/fluss/src/client/table/writer.rs
b/crates/fluss/src/client/table/writer.rs
index b2ba881..8a83b5e 100644
--- a/crates/fluss/src/client/table/writer.rs
+++ b/crates/fluss/src/client/table/writer.rs
@@ -43,6 +43,7 @@ pub struct AbstractTableWriter {
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
field_count: i32,
+ schema_id: i32,
}
#[allow(dead_code)]
@@ -57,6 +58,7 @@ impl AbstractTableWriter {
table_path: Arc::new(table_path),
writer_client,
field_count: table_info.row_type().fields().len() as i32,
+ schema_id: table_info.schema_id,
}
}
@@ -82,7 +84,8 @@ pub struct AppendWriterImpl {
#[allow(dead_code)]
impl AppendWriterImpl {
pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
- let record = WriteRecord::new(self.base.table_path.clone(), row);
+ let record =
+ WriteRecord::for_append(self.base.table_path.clone(),
self.base.schema_id, row);
self.base.send(&record).await
}
}
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 83f11ab..0afc9d4 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -17,7 +17,7 @@
use crate::client::write::batch::WriteBatch::ArrowLog;
use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
-use crate::client::{Record, ResultHandle, WriteRecord};
+use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
use crate::error::Result;
@@ -110,7 +110,7 @@ impl RecordAccumulator {
row_type,
bucket_id,
current_time_ms(),
- matches!(record.row, Record::RecordBatch(_)),
+ matches!(&record.record,
Record::Log(LogWriteRecord::RecordBatch(_))),
));
let batch_id = batch.batch_id();
@@ -541,8 +541,9 @@ mod tests {
let accumulator = RecordAccumulator::new(config);
let table_path = Arc::new(TablePath::new("db".to_string(),
"tbl".to_string()));
let cluster = Arc::new(build_cluster(table_path.as_ref(), 1, 1));
- let record = WriteRecord::new(
+ let record = WriteRecord::for_append(
table_path.clone(),
+ 1,
GenericRow {
values: vec![Datum::Int32(1)],
},
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 1f54226..0159753 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -17,13 +17,13 @@
use crate::BucketId;
use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
-use crate::client::{ResultHandle, WriteRecord};
+use crate::client::{Record, ResultHandle, WriteRecord};
use crate::compression::ArrowCompressionInfo;
-use crate::error::Result;
-use crate::metadata::{DataType, TablePath};
+use crate::error::{Error, Result};
+use crate::metadata::{DataType, KvFormat, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
+use crate::record::kv::KvRecordBatchBuilder;
use bytes::Bytes;
-use parking_lot::Mutex;
use std::cmp::max;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
@@ -92,18 +92,28 @@ impl InnerWriteBatch {
pub enum WriteBatch {
ArrowLog(ArrowLogWriteBatch),
+ Kv(KvWriteBatch),
}
impl WriteBatch {
pub fn inner_batch(&self) -> &InnerWriteBatch {
match self {
WriteBatch::ArrowLog(batch) => &batch.write_batch,
+ WriteBatch::Kv(batch) => &batch.write_batch,
+ }
+ }
+
+ pub fn inner_batch_mut(&mut self) -> &mut InnerWriteBatch {
+ match self {
+ WriteBatch::ArrowLog(batch) => &mut batch.write_batch,
+ WriteBatch::Kv(batch) => &mut batch.write_batch,
}
}
pub fn try_append(&mut self, write_record: &WriteRecord) ->
Result<Option<ResultHandle>> {
match self {
WriteBatch::ArrowLog(batch) => batch.try_append(write_record),
+ WriteBatch::Kv(batch) => batch.try_append(write_record),
}
}
@@ -111,11 +121,13 @@ impl WriteBatch {
self.inner_batch().waited_time_ms(now)
}
- pub fn close(&mut self) {
+ pub fn close(&mut self) -> Result<()> {
match self {
WriteBatch::ArrowLog(batch) => {
batch.close();
+ Ok(())
}
+ WriteBatch::Kv(batch) => batch.close(),
}
}
@@ -127,20 +139,18 @@ impl WriteBatch {
pub fn is_closed(&self) -> bool {
match self {
WriteBatch::ArrowLog(batch) => batch.is_closed(),
+ WriteBatch::Kv(batch) => batch.is_closed(),
}
}
pub fn drained(&mut self, now_ms: i64) {
- match self {
- WriteBatch::ArrowLog(batch) => {
- batch.write_batch.drained(now_ms);
- }
- }
+ self.inner_batch_mut().drained(now_ms);
}
- pub fn build(&self) -> Result<Bytes> {
+ pub fn build(&mut self) -> Result<Bytes> {
match self {
WriteBatch::ArrowLog(batch) => batch.build(),
+ WriteBatch::Kv(batch) => batch.build(),
}
}
@@ -172,7 +182,7 @@ impl WriteBatch {
pub struct ArrowLogWriteBatch {
pub write_batch: InnerWriteBatch,
pub arrow_builder: MemoryLogRecordsArrowBuilder,
- built_records: Mutex<Option<Bytes>>,
+ built_records: Option<Bytes>,
}
impl ArrowLogWriteBatch {
@@ -196,7 +206,7 @@ impl ArrowLogWriteBatch {
to_append_record_batch,
arrow_compression_info,
),
- built_records: Mutex::new(None),
+ built_records: None,
}
}
@@ -218,13 +228,12 @@ impl ArrowLogWriteBatch {
}
}
- pub fn build(&self) -> Result<Bytes> {
- let mut cached = self.built_records.lock();
- if let Some(bytes) = cached.as_ref() {
+ pub fn build(&mut self) -> Result<Bytes> {
+ if let Some(bytes) = &self.built_records {
return Ok(bytes.clone());
}
let bytes = Bytes::from(self.arrow_builder.build()?);
- *cached = Some(bytes.clone());
+ self.built_records = Some(bytes.clone());
Ok(bytes)
}
@@ -237,6 +246,96 @@ impl ArrowLogWriteBatch {
}
}
+pub struct KvWriteBatch {
+ write_batch: InnerWriteBatch,
+ kv_batch_builder: KvRecordBatchBuilder,
+ target_columns: Option<Vec<usize>>,
+ schema_id: i32,
+}
+
+impl KvWriteBatch {
+ #[allow(clippy::too_many_arguments)]
+ pub fn new(
+ batch_id: i64,
+ table_path: TablePath,
+ schema_id: i32,
+ write_limit: usize,
+ kv_format: KvFormat,
+ bucket_id: BucketId,
+ target_columns: Option<Vec<usize>>,
+ create_ms: i64,
+ ) -> Self {
+ let base = InnerWriteBatch::new(batch_id, table_path, create_ms,
bucket_id);
+ Self {
+ write_batch: base,
+ kv_batch_builder: KvRecordBatchBuilder::new(schema_id,
write_limit, kv_format),
+ target_columns,
+ schema_id,
+ }
+ }
+
+ pub fn try_append(&mut self, write_record: &WriteRecord) ->
Result<Option<ResultHandle>> {
+ let kv_write_record = match &write_record.record {
+ Record::Kv(record) => record,
+ _ => {
+ return Err(Error::UnsupportedOperation {
+ message: "Only KvRecord to append to KvWriteBatch
".to_string(),
+ });
+ }
+ };
+
+ let key = kv_write_record.key;
+
+ if self.schema_id != write_record.schema_id {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "schema id {} of the write record to append is not the
same as the current schema id {} in the batch.",
+ write_record.schema_id, self.schema_id
+ ),
+ source: None,
+ });
+ };
+
+ if self.target_columns.as_deref() != kv_write_record.target_columns {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "target columns {:?} of the write record to append are not
the same as the current target columns {:?} in the batch.",
+ kv_write_record.target_columns,
+ self.target_columns.as_deref()
+ ),
+ source: None,
+ });
+ }
+
+ let row = kv_write_record.compacted_row.as_ref();
+
+ if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key,
row) {
+ Ok(None)
+ } else {
+ // append successfully
+ self.kv_batch_builder
+ .append_row(key, row)
+ .map_err(|e| Error::UnexpectedError {
+ message: "Failed to append row to
KvWriteBatch".to_string(),
+ source: Some(Box::new(e)),
+ })?;
+ Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
+ }
+ }
+
+ pub fn build(&mut self) -> Result<Bytes> {
+ self.kv_batch_builder.build()
+ }
+
+ pub fn is_closed(&self) -> bool {
+ self.kv_batch_builder.is_closed()
+ }
+
+ pub fn close(&mut self) -> Result<()> {
+ self.kv_batch_builder.close()
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index 00a71c5..248218e 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -21,7 +21,7 @@ mod batch;
use crate::client::broadcast::{self as client_broadcast, BatchWriteResult,
BroadcastOnceReceiver};
use crate::error::Error;
use crate::metadata::TablePath;
-use crate::row::GenericRow;
+use crate::row::{CompactedRow, GenericRow};
pub use accumulator::*;
use arrow::array::RecordBatch;
use std::sync::Arc;
@@ -36,28 +36,91 @@ mod writer_client;
pub use write_format::WriteFormat;
pub use writer_client::WriterClient;
+#[allow(dead_code)]
pub struct WriteRecord<'a> {
- pub row: Record<'a>,
- pub table_path: Arc<TablePath>,
+ record: Record<'a>,
+ table_path: Arc<TablePath>,
+ bucket_key: Option<&'a [u8]>,
+ schema_id: i32,
+ write_format: WriteFormat,
+}
+
+impl<'a> WriteRecord<'a> {
+ pub fn record(&self) -> &Record<'a> {
+ &self.record
+ }
}
pub enum Record<'a> {
- Row(GenericRow<'a>),
+ Log(LogWriteRecord<'a>),
+ Kv(KvWriteRecord<'a>),
+}
+
+pub enum LogWriteRecord<'a> {
+ Generic(GenericRow<'a>),
RecordBatch(Arc<RecordBatch>),
}
+pub struct KvWriteRecord<'a> {
+ // only valid for primary key table
+ key: &'a [u8],
+ target_columns: Option<&'a [usize]>,
+ compacted_row: Option<CompactedRow<'a>>,
+}
+
+impl<'a> KvWriteRecord<'a> {
+ fn new(
+ key: &'a [u8],
+ target_columns: Option<&'a [usize]>,
+ compacted_row: Option<CompactedRow<'a>>,
+ ) -> Self {
+ KvWriteRecord {
+ key,
+ target_columns,
+ compacted_row,
+ }
+ }
+}
+
impl<'a> WriteRecord<'a> {
- pub fn new(table_path: Arc<TablePath>, row: GenericRow<'a>) -> Self {
+ pub fn for_append(table_path: Arc<TablePath>, schema_id: i32, row:
GenericRow<'a>) -> Self {
+ Self {
+ record: Record::Log(LogWriteRecord::Generic(row)),
+ table_path,
+ bucket_key: None,
+ schema_id,
+ write_format: WriteFormat::ArrowLog,
+ }
+ }
+
+ pub fn for_append_record_batch(
+ table_path: Arc<TablePath>,
+ schema_id: i32,
+ row: RecordBatch,
+ ) -> Self {
Self {
- row: Record::Row(row),
+ record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))),
table_path,
+ bucket_key: None,
+ schema_id,
+ write_format: WriteFormat::ArrowLog,
}
}
- pub fn new_record_batch(table_path: Arc<TablePath>, row: RecordBatch) ->
Self {
+ pub fn for_upsert(
+ table_path: Arc<TablePath>,
+ schema_id: i32,
+ bucket_key: &'a [u8],
+ key: &'a [u8],
+ target_columns: Option<&'a [usize]>,
+ row: CompactedRow<'a>,
+ ) -> Self {
Self {
- row: Record::RecordBatch(Arc::new(row)),
+ record: Record::Kv(KvWriteRecord::new(key, target_columns,
Some(row))),
table_path,
+ bucket_key: Some(bucket_key),
+ schema_id,
+ write_format: WriteFormat::CompactedKv,
}
}
}
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index ffac0af..7ea24e3 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -178,9 +178,9 @@ impl Sender {
};
for (table_id, table_buckets) in write_batch_by_table {
- let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+ let mut request_batches: Vec<ReadyWriteBatch> = table_buckets
.iter()
- .filter_map(|bucket| records_by_bucket.get(bucket))
+ .filter_map(|bucket| records_by_bucket.remove(bucket))
.collect();
if request_batches.is_empty() {
continue;
@@ -189,7 +189,7 @@ impl Sender {
table_id,
acks,
self.max_request_timeout_ms,
- request_batches.as_slice(),
+ &mut request_batches,
) {
Ok(request) => request,
Err(e) => {
@@ -205,6 +205,12 @@ impl Sender {
}
};
+ // let's put in back into records_by_bucket
+ // since response handle will use it.
+ for request_batch in request_batches {
+ records_by_bucket.insert(request_batch.table_bucket.clone(),
request_batch);
+ }
+
let response = match connection.request(request).await {
Ok(response) => response,
Err(e) => {
@@ -462,8 +468,9 @@ mod tests {
cluster: Arc<Cluster>,
table_path: Arc<TablePath>,
) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> {
- let record = WriteRecord::new(
+ let record = WriteRecord::for_append(
table_path,
+ 1,
GenericRow {
values: vec![Datum::Int32(1)],
},
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index b4ae840..eaee94c 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -119,6 +119,34 @@ message PbProduceLogRespForBucket {
optional int64 base_offset = 5;
}
+// put kv request and response
+message PutKvRequest {
+ required int32 acks = 1;
+ required int64 table_id = 2;
+ required int32 timeout_ms = 3;
+ // the indexes for the columns to write,
+ // if empty, means write all columns
+ repeated int32 target_columns = 4 [packed = true];
+ repeated PbPutKvReqForBucket buckets_req = 5;
+}
+
+message PutKvResponse {
+ repeated PbPutKvRespForBucket buckets_resp = 1;
+}
+
+message PbPutKvReqForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ required bytes records = 3;
+}
+
+message PbPutKvRespForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ optional int32 error_code = 3;
+ optional string error_message = 4;
+}
+
message CreateTableRequest {
required PbTablePath table_path = 1;
required bytes table_json = 2;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index b331ae9..aa48376 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::{Record, WriteRecord};
+use crate::client::{LogWriteRecord, Record, WriteRecord};
use crate::compression::ArrowCompressionInfo;
use crate::error::{Error, Result};
use crate::metadata::DataType;
@@ -275,11 +275,16 @@ impl MemoryLogRecordsArrowBuilder {
}
pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
- match &record.row {
- Record::Row(row) =>
Ok(self.arrow_record_batch_builder.append(row)?),
- Record::RecordBatch(record_batch) => Ok(self
- .arrow_record_batch_builder
- .append_batch(record_batch.clone())?),
+ match &record.record() {
+ Record::Log(log_write_record) => match log_write_record {
+ LogWriteRecord::Generic(row) =>
Ok(self.arrow_record_batch_builder.append(row)?),
+ LogWriteRecord::RecordBatch(record_batch) => Ok(self
+ .arrow_record_batch_builder
+ .append_batch(record_batch.clone())?),
+ },
+ Record::Kv(_) => Err(Error::UnsupportedOperation {
+ message: "Only LogRecord is supported to append".to_string(),
+ }),
}
// todo: consider write other change type
}
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 636104d..e3da864 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -19,9 +19,7 @@
//!
//! This module provides the KvRecordBatchBuilder for building batches of KV
records.
-use bytes::{Bytes, BytesMut};
-use std::io;
-
+use crate::error::{Error, Result};
use crate::metadata::KvFormat;
use crate::record::kv::kv_record::KvRecord;
use crate::record::kv::kv_record_batch::{
@@ -31,6 +29,8 @@ use crate::record::kv::kv_record_batch::{
};
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE,
NO_WRITER_ID};
use crate::row::BinaryRow;
+use bytes::{Bytes, BytesMut};
+use std::io;
/// Builder for KvRecordBatch.
///
@@ -185,11 +185,12 @@ impl KvRecordBatchBuilder {
/// built bytes may change if mutations occur between builds.
///
/// Note: [`close`](Self::close) prevents further appends but does not
prevent writer state modifications.
- pub fn build(&mut self) -> io::Result<Bytes> {
+ pub fn build(&mut self) -> Result<Bytes> {
if self.aborted {
- return Err(io::Error::other(
- "Attempting to build an aborted record batch",
- ));
+ return Err(Error::UnexpectedError {
+ message: "Attempting to build an aborted record
batch".to_string(),
+ source: None,
+ });
}
if let Some(ref cached) = self.built_buffer {
@@ -225,11 +226,13 @@ impl KvRecordBatchBuilder {
/// Close the builder.
/// After closing, no more records can be appended, but the batch can
still be built.
- pub fn close(&mut self) -> io::Result<()> {
+ pub fn close(&mut self) -> Result<()> {
if self.aborted {
- return Err(io::Error::other(
- "Cannot close KvRecordBatchBuilder as it has already been
aborted",
- ));
+ return Err(Error::UnexpectedError {
+ message: "Cannot close KvRecordBatchBuilder as it has already
been aborted"
+ .to_string(),
+ source: None,
+ });
}
self.is_closed = true;
Ok(())
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs
b/crates/fluss/src/record/kv/kv_record_read_context.rs
index 2049c32..fe6c6f0 100644
--- a/crates/fluss/src/record/kv/kv_record_read_context.rs
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -90,8 +90,7 @@ impl ReadContext for KvRecordReadContext {
other => {
return Err(Error::IoUnexpectedError {
message: format!(
- "Schema {} has invalid row type: expected Row, got
{:?}",
- schema_id, other
+ "Schema {schema_id} has invalid row type: expected
Row, got {other:?}"
),
source: std::io::Error::new(
std::io::ErrorKind::InvalidData,
@@ -134,7 +133,7 @@ mod tests {
fn new(data_types: Vec<crate::metadata::DataType>) -> Self {
let mut builder = Schema::builder();
for (i, dt) in data_types.iter().enumerate() {
- builder = builder.column(&format!("field{}", i), dt.clone());
+ builder = builder.column(&format!("field{i}"), dt.clone());
}
let schema = builder.build().expect("Failed to build schema");
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 9f9268e..66e4beb 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -30,6 +30,7 @@ pub enum ApiKey {
TableExists,
MetaData,
ProduceLog,
+ PutKv,
FetchLog,
Lookup,
ListOffsets,
@@ -54,6 +55,7 @@ impl From<i16> for ApiKey {
1012 => ApiKey::MetaData,
1014 => ApiKey::ProduceLog,
1015 => ApiKey::FetchLog,
+ 1016 => ApiKey::PutKv,
1017 => ApiKey::Lookup,
1021 => ApiKey::ListOffsets,
1025 => ApiKey::GetFileSystemSecurityToken,
@@ -79,6 +81,7 @@ impl From<ApiKey> for i16 {
ApiKey::MetaData => 1012,
ApiKey::ProduceLog => 1014,
ApiKey::FetchLog => 1015,
+ ApiKey::PutKv => 1016,
ApiKey::Lookup => 1017,
ApiKey::ListOffsets => 1021,
ApiKey::GetFileSystemSecurityToken => 1025,
@@ -108,6 +111,7 @@ mod tests {
(1012, ApiKey::MetaData),
(1014, ApiKey::ProduceLog),
(1015, ApiKey::FetchLog),
+ (1016, ApiKey::PutKv),
(1017, ApiKey::Lookup),
(1021, ApiKey::ListOffsets),
(1025, ApiKey::GetFileSystemSecurityToken),
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 2fe506b..4e6c8e1 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -36,6 +36,7 @@ mod list_offsets;
mod list_tables;
mod lookup;
mod produce_log;
+mod put_kv;
mod table_exists;
mod update_metadata;
diff --git a/crates/fluss/src/rpc/message/produce_log.rs
b/crates/fluss/src/rpc/message/produce_log.rs
index eb72575..dab7ea9 100644
--- a/crates/fluss/src/rpc/message/produce_log.rs
+++ b/crates/fluss/src/rpc/message/produce_log.rs
@@ -37,7 +37,7 @@ impl ProduceLogRequest {
table_id: i64,
ack: i16,
max_request_timeout_ms: i32,
- ready_batches: &[&ReadyWriteBatch],
+ ready_batches: &mut [ReadyWriteBatch],
) -> FlussResult<Self> {
let mut request = proto::ProduceLogRequest {
table_id,
diff --git a/crates/fluss/src/rpc/message/put_kv.rs
b/crates/fluss/src/rpc/message/put_kv.rs
new file mode 100644
index 0000000..983faa6
--- /dev/null
+++ b/crates/fluss/src/rpc/message/put_kv.rs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::client::ReadyWriteBatch;
+use crate::proto::{PbPutKvReqForBucket, PutKvResponse};
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::ReadError;
+use crate::rpc::frame::WriteError;
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[allow(dead_code)]
+pub struct PutKvRequest {
+ pub inner_request: proto::PutKvRequest,
+}
+
+#[allow(dead_code)]
+impl PutKvRequest {
+ pub fn new(
+ table_id: i64,
+ ack: i16,
+ max_request_timeout_ms: i32,
+ target_columns: Vec<i32>,
+ ready_batches: &mut [ReadyWriteBatch],
+ ) -> crate::error::Result<Self> {
+ let mut request = proto::PutKvRequest {
+ table_id,
+ acks: ack as i32,
+ timeout_ms: max_request_timeout_ms,
+ target_columns,
+ ..Default::default()
+ };
+ for ready_batch in ready_batches {
+ request.buckets_req.push(PbPutKvReqForBucket {
+ partition_id: ready_batch.table_bucket.partition_id(),
+ bucket_id: ready_batch.table_bucket.bucket_id(),
+ records: ready_batch.write_batch.build()?,
+ })
+ }
+
+ Ok(PutKvRequest {
+ inner_request: request,
+ })
+ }
+}
+
+impl RequestBody for PutKvRequest {
+ type ResponseBody = PutKvResponse;
+
+ const API_KEY: ApiKey = ApiKey::PutKv;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(PutKvRequest);
+impl_read_version_type!(PutKvResponse);