Copilot commented on code in PR #121:
URL: https://github.com/apache/fluss-rust/pull/121#discussion_r2658676657
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
+
+use bytes::{Bytes, BytesMut};
+
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_len(&mut self, need_len: usize) {
+ if self.buffer.len() < need_len {
+ self.buffer.resize(need_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_len(end);
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: i8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ self.write_int(value.len() as i32);
+ self.write_raw(value);
+ }
+
+ pub fn write_char(&mut self, value: &str, length: usize) {
+ self.write_string(value);
Review Comment:
The parameter 'length' is unused in this function. The function currently
ignores the length parameter and just writes the entire string. If the intent
is to truncate or pad the string to a specific length (as might be expected for
CHAR types in databases), this behavior should be implemented. Otherwise, if
the parameter is not needed, it should be removed from the function signature.
```suggestion
// Truncate to at most `length` characters and pad with spaces to
exactly `length`.
let mut s: String = value.chars().take(length).collect();
let mut char_count = s.chars().count();
if char_count < length {
s.reserve(length - char_count);
while char_count < length {
s.push(' ');
char_count += 1;
}
}
self.write_string(&s);
```
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
Review Comment:
Missing Apache License header. All other source files in this crate include
the standard Apache Software Foundation license header at the beginning of the
file (lines 1-16). This file should have the same license header for
consistency with the rest of the codebase.
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
+
+use bytes::{Bytes, BytesMut};
+
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_len(&mut self, need_len: usize) {
+ if self.buffer.len() < need_len {
+ self.buffer.resize(need_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_len(end);
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: i8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ self.write_int(value.len() as i32);
+ self.write_raw(value);
+ }
+
+ pub fn write_char(&mut self, value: &str, length: usize) {
+ self.write_string(value);
+ }
+
+ pub fn write_string(&mut self, value: &str) {
+ self.write_bytes(value.as_ref());
+ }
+
+ pub fn write_short(&mut self, value: i16) {
+ self.write_raw(&value.to_be_bytes());
+ }
+
+ pub fn write_int(&mut self, value: i32) {
+ self.ensure_len(self.position + Self::MAX_INT_SIZE);
+ let mut v = value as u32;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
+ pub fn write_long(&mut self, value: i64) {
+ self.ensure_len(self.position + Self::MAX_LONG_SIZE);
+ let mut v = value as u64;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
+
+ pub fn write_float(&mut self, value: f32) {
+ self.write_raw(&value.to_be_bytes());
+ }
+
+ pub fn write_double(&mut self, value: f64) {
+ self.write_raw(&value.to_be_bytes());
+ }
+}
Review Comment:
Missing documentation for the public struct and its public methods. This is
a new public API that should have documentation comments explaining: (1) what
CompactedRowWriter is used for, (2) the format it writes, (3) how to use it,
and (4) what each public method does including their parameters and behavior.
Other public types in the codebase follow this pattern.
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
+
+use bytes::{Bytes, BytesMut};
+
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_len(&mut self, need_len: usize) {
+ if self.buffer.len() < need_len {
+ self.buffer.resize(need_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_len(end);
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: i8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ self.write_int(value.len() as i32);
+ self.write_raw(value);
+ }
+
+ pub fn write_char(&mut self, value: &str, length: usize) {
+ self.write_string(value);
+ }
+
+ pub fn write_string(&mut self, value: &str) {
+ self.write_bytes(value.as_ref());
+ }
+
+ pub fn write_short(&mut self, value: i16) {
+ self.write_raw(&value.to_be_bytes());
+ }
+
+ pub fn write_int(&mut self, value: i32) {
+ self.ensure_len(self.position + Self::MAX_INT_SIZE);
+ let mut v = value as u32;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
Review Comment:
The variable-length encoding for signed integers may be incorrect. Casting a
negative signed integer directly to unsigned (e.g., -1 as i32 becomes
0xFFFFFFFF as u32) will result in inefficient encoding since all bits will be
set. Typically, variable-length encoding of signed integers should use zigzag
encoding to map negative values efficiently. For example, zigzag encoding maps
0 to 0, -1 to 1, 1 to 2, -2 to 3, etc. This allows small negative numbers to be
encoded compactly. Verify this matches the Java reference implementation's
behavior, and if zigzag encoding is expected, implement it here.
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
+
+use bytes::{Bytes, BytesMut};
+
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_len(&mut self, need_len: usize) {
+ if self.buffer.len() < need_len {
+ self.buffer.resize(need_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_len(end);
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: i8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ self.write_int(value.len() as i32);
Review Comment:
Potential overflow when casting length to i32. On 64-bit systems, usize can
be larger than i32::MAX (2,147,483,647). If value.len() exceeds i32::MAX, this
cast will overflow and produce incorrect results. Consider adding a check to
ensure the length fits in an i32, or use a different encoding approach for
large byte arrays.
```suggestion
let len_i32 = i32::try_from(value.len())
.expect("byte slice too large to encode length as i32");
self.write_int(len_i32);
```
##########
crates/fluss/src/row/compacted/mod.rs:
##########
@@ -0,0 +1,3 @@
+mod compacted_row_writer;
+
+
Review Comment:
Extra blank lines at the end of the file. Remove the trailing blank lines to
follow standard Rust formatting conventions.
```suggestion
```
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
+
+use bytes::{Bytes, BytesMut};
+
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_len(&mut self, need_len: usize) {
+ if self.buffer.len() < need_len {
+ self.buffer.resize(need_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_len(end);
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: i8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ self.write_int(value.len() as i32);
+ self.write_raw(value);
+ }
+
+ pub fn write_char(&mut self, value: &str, length: usize) {
+ self.write_string(value);
+ }
+
+ pub fn write_string(&mut self, value: &str) {
+ self.write_bytes(value.as_ref());
+ }
+
+ pub fn write_short(&mut self, value: i16) {
+ self.write_raw(&value.to_be_bytes());
+ }
+
+ pub fn write_int(&mut self, value: i32) {
+ self.ensure_len(self.position + Self::MAX_INT_SIZE);
+ let mut v = value as u32;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
+ pub fn write_long(&mut self, value: i64) {
+ self.ensure_len(self.position + Self::MAX_LONG_SIZE);
+ let mut v = value as u64;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
Review Comment:
The variable-length encoding for signed longs may be incorrect. Casting a
negative signed long directly to unsigned (e.g., -1 as i64 becomes
0xFFFFFFFFFFFFFFFF as u64) will result in inefficient encoding since all bits
will be set. Typically, variable-length encoding of signed integers should use
zigzag encoding to map negative values efficiently. This is the same issue as
in write_int. Verify this matches the Java reference implementation's behavior,
and if zigzag encoding is expected, implement it here.
##########
crates/fluss/src/row/compacted/mod.rs:
##########
@@ -0,0 +1,3 @@
+mod compacted_row_writer;
+
+
Review Comment:
The compacted_row_writer module is declared but not publicly exported. If
CompactedRowWriter is intended to be used by code outside this module (which
appears to be the case since it's a new public API), you should add a public
re-export such as 'pub use compacted_row_writer::*;' or 'pub use
compacted_row_writer::CompactedRowWriter;'.
```suggestion
pub use compacted_row_writer::CompactedRowWriter;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]