leekeiabstraction commented on code in PR #156: URL: https://github.com/apache/fluss-rust/pull/156#discussion_r2685793866
########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; Review Comment: Should we change method signature to return Result<usize> and return Err here? Otherwise if this edge case occurs, failure will happen later and it will be non-obvious to debug. ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ Review Comment: Java side's implementation uses MemorySegment object which read using LE order. > Reads an int value (32bit, 4 bytes) from the given position, in little-endian byte order. ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { Review Comment: Similarly here. ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ]) + } + + /// Get the attributes byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn attributes(&self) -> u8 { Review Comment: I do not see this function defined/used on Java side Also similar comment on returning Err for the rest of doc instead of returning 0 for erroneous case. ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ]) + } + + /// Get the attributes byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn attributes(&self) -> u8 { + if self.data.len() + < self + .position + .saturating_add(ATTRIBUTES_OFFSET) + .saturating_add(1) + { + return 0; + } + self.data[self.position + ATTRIBUTES_OFFSET] + } + + /// Get the writer ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn writer_id(&self) -> i64 { + if self.data.len() + < self + .position + .saturating_add(WRITER_ID_OFFSET) + .saturating_add(8) + { + return 0; + } + i64::from_be_bytes([ + self.data[self.position + WRITER_ID_OFFSET], + self.data[self.position + WRITER_ID_OFFSET + 1], + self.data[self.position + WRITER_ID_OFFSET + 2], + self.data[self.position + WRITER_ID_OFFSET + 3], + self.data[self.position + WRITER_ID_OFFSET + 4], + self.data[self.position + WRITER_ID_OFFSET + 5], + self.data[self.position + WRITER_ID_OFFSET + 6], + self.data[self.position + WRITER_ID_OFFSET + 7], + ]) + } + + /// Get the batch sequence. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn batch_sequence(&self) -> i32 { + if self.data.len() + < self + .position + .saturating_add(BATCH_SEQUENCE_OFFSET) + .saturating_add(4) + { + return 0; + } + i32::from_be_bytes([ Review Comment: Similar comment on endianness ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ Review Comment: Similar comment on endianess ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ Review Comment: Similar comment on maintaining parity with Java side using LE through MemorySegment ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { Review Comment: Similar comment on returning Err for the rest of doc instead of returning 0 for erroneous case ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { Review Comment: Similar comment on returning Result<u8> instead of zero to fail fast. ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { Review Comment: Curious here about this check which does not seem to be in Java's DefaultKvRecordBatch. Did an edge case arise in your testing that necessitate this additional check? If so, should we also port to Java side? ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ]) + } + + /// Get the attributes byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn attributes(&self) -> u8 { + if self.data.len() + < self + .position + .saturating_add(ATTRIBUTES_OFFSET) + .saturating_add(1) + { + return 0; + } + self.data[self.position + ATTRIBUTES_OFFSET] + } + + /// Get the writer ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn writer_id(&self) -> i64 { Review Comment: Similar comment on returning Err for the rest of doc instead of returning 0 for erroneous case ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { Review Comment: Similar comment on returning Err for the rest of doc instead of returning 0 for erroneous case ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ]) + } + + /// Get the attributes byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn attributes(&self) -> u8 { + if self.data.len() + < self + .position + .saturating_add(ATTRIBUTES_OFFSET) + .saturating_add(1) + { + return 0; + } + self.data[self.position + ATTRIBUTES_OFFSET] + } + + /// Get the writer ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn writer_id(&self) -> i64 { + if self.data.len() + < self + .position + .saturating_add(WRITER_ID_OFFSET) + .saturating_add(8) + { + return 0; + } + i64::from_be_bytes([ + self.data[self.position + WRITER_ID_OFFSET], + self.data[self.position + WRITER_ID_OFFSET + 1], + self.data[self.position + WRITER_ID_OFFSET + 2], + self.data[self.position + WRITER_ID_OFFSET + 3], + self.data[self.position + WRITER_ID_OFFSET + 4], + self.data[self.position + WRITER_ID_OFFSET + 5], + self.data[self.position + WRITER_ID_OFFSET + 6], + self.data[self.position + WRITER_ID_OFFSET + 7], + ]) + } + + /// Get the batch sequence. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn batch_sequence(&self) -> i32 { Review Comment: Similar comment on returning Err for the rest of doc instead of returning 0 for erroneous case ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ]) + } + + /// Get the attributes byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn attributes(&self) -> u8 { + if self.data.len() + < self + .position + .saturating_add(ATTRIBUTES_OFFSET) + .saturating_add(1) + { + return 0; + } + self.data[self.position + ATTRIBUTES_OFFSET] + } + + /// Get the writer ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn writer_id(&self) -> i64 { + if self.data.len() + < self + .position + .saturating_add(WRITER_ID_OFFSET) + .saturating_add(8) + { + return 0; + } + i64::from_be_bytes([ Review Comment: Similar comment on endianess ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITER_ID_OFFSET + WRITER_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + /// Returns 0 if the batch is invalid or length is negative. + pub fn size_in_bytes(&self) -> usize { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return 0; + } + let length_i32 = i32::from_be_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return 0; + } + + let length = length_i32 as usize; + + length.saturating_add(LENGTH_LENGTH) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if self.size_in_bytes() < RECORD_BATCH_HEADER_SIZE { + return false; + } + if self.data.len() < self.position.saturating_add(RECORD_BATCH_HEADER_SIZE) { + return false; + } + + if self.record_count() < 0 { + return false; + } + self.checksum() == self.compute_checksum() + } + + /// Get the magic byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn magic(&self) -> u8 { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return 0; + } + self.data[self.position + MAGIC_OFFSET] + } + + /// Get the checksum. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn checksum(&self) -> u32 { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return 0; + } + u32::from_be_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ]) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> u32 { + let size = self.size_in_bytes(); + if size < RECORD_BATCH_HEADER_SIZE { + return 0; + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return 0; + } + + crc32c::crc32c(&self.data[start..end]) + } + + /// Get the schema ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn schema_id(&self) -> i16 { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return 0; + } + i16::from_be_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ]) + } + + /// Get the attributes byte. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn attributes(&self) -> u8 { + if self.data.len() + < self + .position + .saturating_add(ATTRIBUTES_OFFSET) + .saturating_add(1) + { + return 0; + } + self.data[self.position + ATTRIBUTES_OFFSET] + } + + /// Get the writer ID. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn writer_id(&self) -> i64 { + if self.data.len() + < self + .position + .saturating_add(WRITER_ID_OFFSET) + .saturating_add(8) + { + return 0; + } + i64::from_be_bytes([ + self.data[self.position + WRITER_ID_OFFSET], + self.data[self.position + WRITER_ID_OFFSET + 1], + self.data[self.position + WRITER_ID_OFFSET + 2], + self.data[self.position + WRITER_ID_OFFSET + 3], + self.data[self.position + WRITER_ID_OFFSET + 4], + self.data[self.position + WRITER_ID_OFFSET + 5], + self.data[self.position + WRITER_ID_OFFSET + 6], + self.data[self.position + WRITER_ID_OFFSET + 7], + ]) + } + + /// Get the batch sequence. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn batch_sequence(&self) -> i32 { + if self.data.len() + < self + .position + .saturating_add(BATCH_SEQUENCE_OFFSET) + .saturating_add(4) + { + return 0; + } + i32::from_be_bytes([ + self.data[self.position + BATCH_SEQUENCE_OFFSET], + self.data[self.position + BATCH_SEQUENCE_OFFSET + 1], + self.data[self.position + BATCH_SEQUENCE_OFFSET + 2], + self.data[self.position + BATCH_SEQUENCE_OFFSET + 3], + ]) + } + + /// Get the number of records in this batch. + /// Returns 0 if the batch is too short to contain a valid header. + pub fn record_count(&self) -> i32 { Review Comment: Similar comment on returning Err for the rest of doc instead of returning 0 for erroneous case ########## crates/fluss/src/record/kv/kv_record_batch.rs: ########## @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITER_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITER_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; Review Comment: nit: use same constant name as Java side WRITER_CLIENT_ID_OFFSET ########## crates/fluss/src/record/kv/kv_record_batch_builder.rs: ########## @@ -0,0 +1,659 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch builder implementation. +//! +//! This module provides the KvRecordBatchBuilder for building batches of KV records. + +use bytes::{Bytes, BytesMut}; +use std::io; + +use crate::metadata::KvFormat; +use crate::record::kv::kv_record::KvRecord; +use crate::record::kv::kv_record_batch::{ + ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, LENGTH_OFFSET, + MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, SCHEMA_ID_OFFSET, + WRITER_ID_OFFSET, +}; +use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, NO_WRITER_ID}; +use crate::row::compacted::CompactedRowWriter; + +/// Builder for KvRecordBatch. +/// +/// This builder accumulates KV records and produces a serialized batch with proper +/// header information and checksums. +pub struct KvRecordBatchBuilder { + schema_id: i32, + magic: u8, + write_limit: usize, + buffer: BytesMut, + writer_id: i64, + batch_sequence: i32, + current_record_number: i32, + size_in_bytes: usize, + is_closed: bool, + kv_format: KvFormat, + aborted: bool, + built_buffer: Option<Bytes>, +} + +impl KvRecordBatchBuilder { + /// Create a new KvRecordBatchBuilder. + /// + /// # Arguments + /// * `schema_id` - The schema ID for records in this batch (must fit in i16) + /// * `write_limit` - Maximum bytes that can be appended + /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned) + pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> Self { + assert!( + schema_id <= i16::MAX as i32, + "schema_id shouldn't be greater than the max value of i16: {}", + i16::MAX + ); + + let mut buffer = BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE)); + + // Reserve space for header (we'll write it at the end) + buffer.resize(RECORD_BATCH_HEADER_SIZE, 0); + + Self { + schema_id, + magic: CURRENT_KV_MAGIC_VALUE, + write_limit, + buffer, + writer_id: NO_WRITER_ID, + batch_sequence: NO_BATCH_SEQUENCE, + current_record_number: 0, + size_in_bytes: RECORD_BATCH_HEADER_SIZE, + is_closed: false, + kv_format, + aborted: false, + built_buffer: None, + } + } + + /// Check if there is room for a new record containing the given key and value. + /// If no records have been appended, this always returns true. + pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool { Review Comment: Should we define a BinaryRow trait? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
