This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9c41e2c chore: correct log record batch checksum bounds (#172)
9c41e2c is described below
commit 9c41e2c1c5075b8099a78288873a5ddd1eaa98af
Author: AlexZhao <[email protected]>
AuthorDate: Sat Jan 17 15:37:07 2026 +0800
chore: correct log record batch checksum bounds (#172)
---
crates/fluss/src/record/arrow.rs | 30 ++++++++++++++++++++++++++----
1 file changed, 26 insertions(+), 4 deletions(-)
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index c166ebe..b331ae9 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -86,6 +86,8 @@ pub enum LogMagicValue {
V0 = 0,
}
+// NOTE: Rust layout/offsets currently match Java only for V0.
+// TODO: Add V1 layout/offsets to keep parity with Java's V1 format.
pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8;
/// Value used if writer ID is not available or non-idempotent.
@@ -457,8 +459,7 @@ impl LogRecordBatch {
fn compute_checksum(&self) -> u32 {
let start = SCHEMA_ID_OFFSET;
- let end = start + self.data.len();
- crc32c(&self.data[start..end])
+ crc32c(&self.data[start..])
}
fn attributes(&self) -> u8 {
@@ -471,12 +472,12 @@ impl LogRecordBatch {
pub fn checksum(&self) -> u32 {
let offset = CRC_OFFSET;
- LittleEndian::read_u32(&self.data[offset..offset + CRC_OFFSET])
+ LittleEndian::read_u32(&self.data[offset..offset + CRC_LENGTH])
}
pub fn schema_id(&self) -> i16 {
let offset = SCHEMA_ID_OFFSET;
- LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_OFFSET])
+ LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_LENGTH])
}
pub fn base_log_offset(&self) -> i64 {
@@ -1240,6 +1241,27 @@ mod tests {
assert!(matches!(result, Err(Error::IllegalArgument { .. })));
}
+ #[test]
+ fn checksum_and_schema_id_read_minimum_header() {
+ // Header-only batches with record_count == 0 are valid; this covers
the minimal bytes
+ // needed for checksum/schema_id access.
+ let mut data = vec![0u8; SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH];
+ let crc = 0xA1B2C3D4u32;
+ let schema_id = 42i16;
+ LittleEndian::write_u32(&mut data[CRC_OFFSET..CRC_OFFSET +
CRC_LENGTH], crc);
+ LittleEndian::write_i16(
+ &mut data[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH],
+ schema_id,
+ );
+
+ let batch = LogRecordBatch::new(Bytes::from(data));
+ assert_eq!(batch.checksum(), crc);
+ assert_eq!(batch.schema_id(), schema_id);
+
+ let expected = crc32c(&batch.data[SCHEMA_ID_OFFSET..]);
+ assert_eq!(batch.compute_checksum(), expected);
+ }
+
fn le_bytes(vals: &[u32]) -> Vec<u8> {
let mut out = Vec::with_capacity(vals.len() * 4);
for &v in vals {