Copilot commented on code in PR #176: URL: https://github.com/apache/fluss-rust/pull/176#discussion_r2702108212
########## crates/fluss/src/rpc/message/put_kv.rs: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::client::ReadyWriteBatch; +use crate::proto::{PbPutKvReqForBucket, PutKvResponse}; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::ReadError; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[allow(dead_code)] +pub struct PutKvRequest { + pub inner_request: proto::PutKvRequest, +} + +#[allow(dead_code)] +impl PutKvRequest { + pub fn new( + table_id: i64, + ack: i16, + max_request_timeout_ms: i32, + ready_batches: &mut [ReadyWriteBatch], + ) -> crate::error::Result<Self> { + let mut request = proto::PutKvRequest { + table_id, + acks: ack as i32, + timeout_ms: max_request_timeout_ms, + ..Default::default() + }; + for ready_batch in ready_batches { + request.buckets_req.push(PbPutKvReqForBucket { + partition_id: ready_batch.table_bucket.partition_id(), + bucket_id: ready_batch.table_bucket.bucket_id(), + records: ready_batch.write_batch.build()?, + }) + } + + Ok(PutKvRequest { + inner_request: request, + }) + } Review Comment: The PutKvRequest protobuf message includes a `target_columns` field (line 129 in fluss_api.proto) for specifying which columns to write, but this field is never populated in the PutKvRequest::new() constructor. If target_columns should be set based on the ready_batches, it needs to be extracted and set. If all columns are always written, this is acceptable, but consider documenting this behavior. ########## crates/fluss/src/rpc/message/put_kv.rs: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::client::ReadyWriteBatch; +use crate::proto::{PbPutKvReqForBucket, PutKvResponse}; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::ReadError; +use crate::rpc::frame::WriteError; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[allow(dead_code)] +pub struct PutKvRequest { + pub inner_request: proto::PutKvRequest, +} + +#[allow(dead_code)] Review Comment: The PutKvRequest struct and its implementation are marked with `#[allow(dead_code)]`, but they're intended to be used in this feature. If this code is not yet used but is intended for future use, consider adding a TODO comment explaining when it will be integrated. Otherwise, remove the dead_code attribute once the request is actually being sent. ```suggestion #[allow(dead_code)] // TODO: Remove this allow once PutKvRequest is wired into the request-sending path // and no longer considered dead code. pub struct PutKvRequest { pub inner_request: proto::PutKvRequest, } #[allow(dead_code)] // TODO: Remove this allow once PutKvRequest::new is used by the PutKv feature // to construct and send requests. ``` ########## bindings/python/src/table.rs: ########## @@ -476,8 +474,7 @@ fn python_value_to_datum( } } _ => Err(FlussError::new_err(format!( - "Unsupported data type for row-level operations: {:?}", - data_type + "Unsupported data type for row-level operations: {data_type:?}" Review Comment: These string formatting changes (converting from format! with placeholders to inline format strings) appear to be unrelated to the main purpose of this PR, which is to introduce KvWriteBatch and PutKvRequest. Consider moving these style improvements to a separate PR to keep changes focused on the stated purpose. ```suggestion "Unsupported data type for row-level operations: {:?}", data_type ``` ########## crates/fluss/src/client/write/batch.rs: ########## @@ -237,6 +246,97 @@ impl ArrowLogWriteBatch { } } +pub struct KvWriteBatch { + write_batch: InnerWriteBatch, + kv_batch_builder: KvRecordBatchBuilder, + target_columns: Option<Vec<usize>>, + schema_id: i32, +} + +impl KvWriteBatch { + #[allow(clippy::too_many_arguments)] + pub fn new( + batch_id: i64, + table_path: TablePath, + schema_id: i32, + write_limit: usize, + kv_format: KvFormat, + bucket_id: BucketId, + target_columns: Option<Vec<usize>>, + create_ms: i64, + ) -> Self { + let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id); + Self { + write_batch: base, + kv_batch_builder: KvRecordBatchBuilder::new(schema_id, write_limit, kv_format), + target_columns, + schema_id, + } + } + + pub fn try_append(&mut self, write_record: &WriteRecord) -> Result<Option<ResultHandle>> { + let key = write_record.key.ok_or_else(|| Error::UnexpectedError { + message: "The key for write record of KvWriteBatch must not null".to_string(), Review Comment: The error message grammatical structure should use "must not be null" instead of "must not null". The phrase "must not null" is grammatically incorrect. ```suggestion message: "The key for write record of KvWriteBatch must not be null".to_string(), ``` ########## crates/fluss/src/rpc/message/mod.rs: ########## @@ -20,6 +20,7 @@ use crate::rpc::api_version::ApiVersion; use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; +mod put_kv; Review Comment: The `put_kv` module is declared but not exported in the public API. Add `pub use put_kv::*;` to export the PutKvRequest type so it can be used by other modules. ########## crates/fluss/src/client/write/mod.rs: ########## @@ -33,31 +33,80 @@ mod sender; mod write_format; mod writer_client; +use crate::client::Record::Compacted; Review Comment: The `use crate::client::Record::Compacted` import appears to be unused in this file. The `Compacted` variant is only referenced with its full path `Record::Compacted` in the for_upsert method. Consider removing this unused import. ```suggestion ``` ########## crates/fluss/src/client/write/batch.rs: ########## @@ -237,6 +246,97 @@ impl ArrowLogWriteBatch { } } +pub struct KvWriteBatch { + write_batch: InnerWriteBatch, + kv_batch_builder: KvRecordBatchBuilder, + target_columns: Option<Vec<usize>>, + schema_id: i32, +} + +impl KvWriteBatch { + #[allow(clippy::too_many_arguments)] + pub fn new( + batch_id: i64, + table_path: TablePath, + schema_id: i32, + write_limit: usize, + kv_format: KvFormat, + bucket_id: BucketId, + target_columns: Option<Vec<usize>>, + create_ms: i64, + ) -> Self { + let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id); + Self { + write_batch: base, + kv_batch_builder: KvRecordBatchBuilder::new(schema_id, write_limit, kv_format), + target_columns, + schema_id, + } + } + + pub fn try_append(&mut self, write_record: &WriteRecord) -> Result<Option<ResultHandle>> { + let key = write_record.key.ok_or_else(|| Error::UnexpectedError { + message: "The key for write record of KvWriteBatch must not null".to_string(), + source: None, + })?; + + if self.schema_id != write_record.schema_id { + return Err(Error::UnexpectedError { + message: format!( + "schema id {} of the write record to append is not the same as the current schema id {} in the batch.", + write_record.schema_id, self.schema_id + ), + source: None, + }); + }; + + if self.target_columns.as_deref() != write_record.target_columns { + return Err(Error::UnexpectedError { + message: format!( + "target columns {:?} of the write record to append are not the same as the current target columns {:?} in the batch.", + write_record.target_columns, + self.target_columns.as_deref() + ), + source: None, + }); + } + + let row = match &write_record.record { + Record::Compacted(row) => row.as_ref(), + _ => { + return Err(Error::UnsupportedOperation { + message: "Only Compacted row is supported for KvWriteBatch".to_string(), + }); + } + }; + + if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, row) { + Ok(None) + } else { + // append successfully + self.kv_batch_builder + .append_row(key, row) + .map_err(|e| Error::UnexpectedError { + message: "Fail to append row to KvWriteBatch".to_string(), Review Comment: The error message uses "Fail" which should be "Failed" for grammatical correctness and consistency with other error messages in the codebase (e.g., line 201 in sender.rs uses "Failed to build produce request"). ```suggestion message: "Failed to append row to KvWriteBatch".to_string(), ``` ########## crates/fluss/src/client/write/mod.rs: ########## @@ -33,31 +33,80 @@ mod sender; mod write_format; mod writer_client; +use crate::client::Record::Compacted; pub use write_format::WriteFormat; pub use writer_client::WriterClient; +#[allow(dead_code)] pub struct WriteRecord<'a> { - pub row: Record<'a>, - pub table_path: Arc<TablePath>, + record: Record<'a>, + table_path: Arc<TablePath>, + bucket_key: Option<&'a [u8]>, + schema_id: i32, + write_format: WriteFormat, + + // only valid for primary key table + key: Option<&'a [u8]>, + target_columns: Option<&'a [usize]>, Review Comment: Multiple fields of WriteRecord have been changed from public to private (table_path, record, bucket_key, schema_id, write_format, key, target_columns), but they are still being directly accessed in several places: - `table_path` is accessed in accumulator.rs and writer_client.rs - `key`, `schema_id`, `target_columns`, and `record` are accessed in batch.rs (KvWriteBatch::try_append) - `record` is accessed in accumulator.rs and arrow.rs These fields need either: 1. To remain public, or 2. Have public getter methods added (e.g., `pub fn table_path(&self) -> &Arc<TablePath>`, `pub fn key(&self) -> Option<&[u8]>`, etc.) Without this, the code will not compile. ```suggestion pub record: Record<'a>, pub table_path: Arc<TablePath>, pub bucket_key: Option<&'a [u8]>, pub schema_id: i32, pub write_format: WriteFormat, // only valid for primary key table pub key: Option<&'a [u8]>, pub target_columns: Option<&'a [usize]>, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
