leekeiabstraction commented on code in PR #121:
URL: https://github.com/apache/fluss-rust/pull/121#discussion_r2658768397
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -0,0 +1,127 @@
+use std::cmp;
+
+use bytes::{Bytes, BytesMut};
+
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_len(&mut self, need_len: usize) {
+ if self.buffer.len() < need_len {
+ self.buffer.resize(need_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_len(end);
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: i8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ self.write_int(value.len() as i32);
+ self.write_raw(value);
+ }
+
+ pub fn write_char(&mut self, value: &str, length: usize) {
+ self.write_string(value);
+ }
+
+ pub fn write_string(&mut self, value: &str) {
+ self.write_bytes(value.as_ref());
+ }
+
+ pub fn write_short(&mut self, value: i16) {
+ self.write_raw(&value.to_be_bytes());
Review Comment:
@luoyuxia are parts of the compacted row persisted locally as file or sent
to server? If so, I do think we need to enforce little endianess to ensure
consistent result when reading from different machines (it's unlikely as most
machines are LE but possible if we use native endianness on rust side).
The write_integer method implementation on Java side (and also within this
rust file) enforces LE
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]