This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c41e2c  chore: correct log record batch checksum bounds (#172)
9c41e2c is described below

commit 9c41e2c1c5075b8099a78288873a5ddd1eaa98af
Author: AlexZhao <[email protected]>
AuthorDate: Sat Jan 17 15:37:07 2026 +0800

    chore: correct log record batch checksum bounds (#172)
---
 crates/fluss/src/record/arrow.rs | 30 ++++++++++++++++++++++++++----
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index c166ebe..b331ae9 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -86,6 +86,8 @@ pub enum LogMagicValue {
     V0 = 0,
 }
 
+// NOTE: Rust layout/offsets currently match Java only for V0.
+// TODO: Add V1 layout/offsets to keep parity with Java's V1 format.
 pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8;
 
 /// Value used if writer ID is not available or non-idempotent.
@@ -457,8 +459,7 @@ impl LogRecordBatch {
 
     fn compute_checksum(&self) -> u32 {
         let start = SCHEMA_ID_OFFSET;
-        let end = start + self.data.len();
-        crc32c(&self.data[start..end])
+        crc32c(&self.data[start..])
     }
 
     fn attributes(&self) -> u8 {
@@ -471,12 +472,12 @@ impl LogRecordBatch {
 
     pub fn checksum(&self) -> u32 {
         let offset = CRC_OFFSET;
-        LittleEndian::read_u32(&self.data[offset..offset + CRC_OFFSET])
+        LittleEndian::read_u32(&self.data[offset..offset + CRC_LENGTH])
     }
 
     pub fn schema_id(&self) -> i16 {
         let offset = SCHEMA_ID_OFFSET;
-        LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_OFFSET])
+        LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_LENGTH])
     }
 
     pub fn base_log_offset(&self) -> i64 {
@@ -1240,6 +1241,27 @@ mod tests {
         assert!(matches!(result, Err(Error::IllegalArgument { .. })));
     }
 
+    #[test]
+    fn checksum_and_schema_id_read_minimum_header() {
+        // Header-only batches with record_count == 0 are valid; this covers 
the minimal bytes
+        // needed for checksum/schema_id access.
+        let mut data = vec![0u8; SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH];
+        let crc = 0xA1B2C3D4u32;
+        let schema_id = 42i16;
+        LittleEndian::write_u32(&mut data[CRC_OFFSET..CRC_OFFSET + 
CRC_LENGTH], crc);
+        LittleEndian::write_i16(
+            &mut data[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH],
+            schema_id,
+        );
+
+        let batch = LogRecordBatch::new(Bytes::from(data));
+        assert_eq!(batch.checksum(), crc);
+        assert_eq!(batch.schema_id(), schema_id);
+
+        let expected = crc32c(&batch.data[SCHEMA_ID_OFFSET..]);
+        assert_eq!(batch.compute_checksum(), expected);
+    }
+
     fn le_bytes(vals: &[u32]) -> Vec<u8> {
         let mut out = Vec::with_capacity(vals.len() * 4);
         for &v in vals {

Reply via email to