fresh-borzoni commented on code in PR #156:
URL: https://github.com/apache/fluss-rust/pull/156#discussion_r2686330311


##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
value.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool {
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Check if there is room for a new record containing the given key and 
CompactedRow.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row(&self, key: &[u8], row_writer: 
Option<&CompactedRowWriter>) -> bool {
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.has_room_for(key, value)
+    }
+
+    /// Append a KV record with a CompactedRow value to the batch.
+    ///
+    /// This is the recommended API for KvFormat::COMPACTED batches.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `row_writer` - The CompactedRowWriter containing the serialized row 
data.
+    ///   Pass None for deletion records.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row(
+        &mut self,
+        key: &[u8],
+        row_writer: Option<&CompactedRowWriter>,
+    ) -> io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.append(key, value)
+    }
+
+    /// Append a KV record to the batch with raw bytes.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `value` - Optional value bytes. If None, this represents a deletion.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The value format doesn't match the configured KV format
+    pub fn append(&mut self, key: &[u8], value: Option<&[u8]>) -> 
io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        self.validate_row_format(value)?;
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Reset the writer state.
+    /// This triggers a rebuild of the batch header on next build.
+    pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: i32) {
+        self.built_buffer = None;
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_sequence;

Review Comment:
   removed, we can just use single method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to