fresh-borzoni commented on code in PR #156:
URL: https://github.com/apache/fluss-rust/pull/156#discussion_r2686334160


##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
value.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool {
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Check if there is room for a new record containing the given key and 
CompactedRow.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row(&self, key: &[u8], row_writer: 
Option<&CompactedRowWriter>) -> bool {
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.has_room_for(key, value)
+    }
+
+    /// Append a KV record with a CompactedRow value to the batch.
+    ///
+    /// This is the recommended API for KvFormat::COMPACTED batches.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `row_writer` - The CompactedRowWriter containing the serialized row 
data.
+    ///   Pass None for deletion records.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row(
+        &mut self,
+        key: &[u8],
+        row_writer: Option<&CompactedRowWriter>,
+    ) -> io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.append(key, value)
+    }
+
+    /// Append a KV record to the batch with raw bytes.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `value` - Optional value bytes. If None, this represents a deletion.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The value format doesn't match the configured KV format
+    pub fn append(&mut self, key: &[u8], value: Option<&[u8]>) -> 
io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        self.validate_row_format(value)?;
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Reset the writer state.
+    /// This triggers a rebuild of the batch header on next build.
+    pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: i32) {
+        self.built_buffer = None;
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_sequence;
+    }
+
+    /// Build the batch and return the serialized bytes.
+    /// This can be called multiple times as the batch is cached after the 
first build.
+    pub fn build(&mut self) -> io::Result<Bytes> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Attempting to build an aborted record batch",
+            ));
+        }
+
+        if let Some(ref cached) = self.built_buffer {
+            return Ok(cached.clone());
+        }
+
+        self.write_batch_header()?;
+        let bytes = self.buffer.clone().freeze();
+        self.built_buffer = Some(bytes.clone());
+        Ok(bytes)
+    }
+
+    /// Get the writer ID.
+    pub fn writer_id(&self) -> i64 {
+        self.writer_id
+    }
+
+    /// Get the batch sequence.
+    pub fn batch_sequence(&self) -> i32 {
+        self.batch_sequence
+    }
+
+    /// Check if the builder is closed.
+    pub fn is_closed(&self) -> bool {
+        self.is_closed
+    }
+
+    /// Abort the builder.
+    /// After aborting, no more records can be appended and the batch cannot 
be built.
+    pub fn abort(&mut self) {
+        self.aborted = true;
+    }
+
+    /// Close the builder.
+    /// After closing, no more records can be appended, but the batch can 
still be built.
+    pub fn close(&mut self) -> io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Cannot close KvRecordBatchBuilder as it has already been 
aborted",
+            ));
+        }
+        self.is_closed = true;
+        Ok(())
+    }
+
+    /// Get the current size in bytes of the batch.
+    pub fn get_size_in_bytes(&self) -> usize {
+        self.size_in_bytes
+    }
+
+    // ----------------------- Internal methods -------------------------------
+
+    /// Write the batch header.
+    fn write_batch_header(&mut self) -> io::Result<()> {
+        let size_without_length = self.size_in_bytes - LENGTH_LENGTH;
+        let total_size = i32::try_from(size_without_length).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Batch size {} exceeds i32::MAX", size_without_length),
+            )
+        })?;
+
+        // Compute attributes before borrowing buffer mutably
+        let attributes = self.compute_attributes();
+
+        // Write to the beginning of the buffer
+        let header = &mut self.buffer[0..RECORD_BATCH_HEADER_SIZE];
+
+        // Write length
+        header[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH]
+            .copy_from_slice(&total_size.to_be_bytes());
+
+        // Write magic
+        header[MAGIC_OFFSET] = self.magic;
+
+        // Write empty CRC first (will update later)
+        header[CRC_OFFSET..CRC_OFFSET + 
4].copy_from_slice(&0u32.to_be_bytes());
+
+        // Write schema ID
+        header[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + 2]
+            .copy_from_slice(&(self.schema_id as i16).to_be_bytes());
+
+        // Write attributes
+        header[ATTRIBUTES_OFFSET] = attributes;
+
+        // Write writer ID
+        header[WRITER_ID_OFFSET..WRITER_ID_OFFSET + 8]
+            .copy_from_slice(&self.writer_id.to_be_bytes());
+
+        // Write batch sequence
+        header[BATCH_SEQUENCE_OFFSET..BATCH_SEQUENCE_OFFSET + 4]
+            .copy_from_slice(&self.batch_sequence.to_be_bytes());
+
+        // Write record count
+        header[RECORDS_COUNT_OFFSET..RECORDS_COUNT_OFFSET + 4]
+            .copy_from_slice(&self.current_record_number.to_be_bytes());
+
+        // Compute and update CRC
+        let crc = 
crc32c::crc32c(&self.buffer[SCHEMA_ID_OFFSET..self.size_in_bytes]);
+        self.buffer[CRC_OFFSET..CRC_OFFSET + 
4].copy_from_slice(&crc.to_be_bytes());
+
+        Ok(())
+    }
+
+    /// Compute the attributes byte.
+    fn compute_attributes(&self) -> u8 {
+        // Currently no attributes are used
+        0
+    }
+
+    /// Validate the row format according to the KV format.
+    fn validate_row_format(&self, value: Option<&[u8]>) -> io::Result<()> {
+        match self.kv_format {
+            KvFormat::COMPACTED => {
+                if let Some(bytes) = value {
+                    // CompactedRow must have at least a header (null bitmap).
+                    // The minimum size is at least 1 byte for any row (even 1 
field has 1 byte header).
+                    // We can't validate the exact header size without knowing 
field count (arity),
+                    // but we can at least check it's not empty.
+                    if bytes.is_empty() {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "CompactedRow value cannot be empty (must have at 
least a header)",
+                        ));
+                    }
+                    // Further validation could be done if we stored 
schema/field count,
+                    // but for now this basic check prevents obvious errors.

Review Comment:
   removed, we don't need this validation really



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to