This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bddce62 feat: implement CompactedRowWriter (#121)
bddce62 is described below
commit bddce626096ca8d88f712cf5f41eaff107433c5b
Author: Kelvin Wu <[email protected]>
AuthorDate: Sat Jan 3 14:45:54 2026 +0800
feat: implement CompactedRowWriter (#121)
---
.../src/row/compacted/compacted_row_writer.rs | 153 +++++++++++++++++++++
crates/fluss/src/row/compacted/mod.rs | 1 +
crates/fluss/src/row/mod.rs | 2 +
3 files changed, 156 insertions(+)
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
new file mode 100644
index 0000000..7c0adde
--- /dev/null
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::cmp;
+
+// Writer for CompactedRow
+// Reference implementation:
+//
https://github.com/apache/fluss/blob/d4a72fad240d4b81563aaf83fa3b09b5058674ed/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java#L71
+pub struct CompactedRowWriter {
+ header_size_in_bytes: usize,
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+ pub const MAX_INT_SIZE: usize = 5;
+ pub const MAX_LONG_SIZE: usize = 10;
+
+ pub fn new(field_count: usize) -> Self {
+ // bitset width in bytes, it should be in CompactedRow
+ let header_size = (field_count + 7) / 8;
+ let cap = cmp::max(64, header_size);
+
+ let mut buffer = BytesMut::with_capacity(cap);
+ buffer.resize(cap, 0);
+
+ Self {
+ header_size_in_bytes: header_size,
+ position: header_size,
+ buffer,
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_capacity(&mut self, need_len: usize) {
+ if (self.buffer.len() - self.position) < need_len {
+ let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() +
need_len);
+ self.buffer.resize(new_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_capacity(src.len());
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+
+ pub fn set_null_at(&mut self, pos: usize) {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ self.buffer[byte_index] |= 1u8 << bit;
+ }
+
+ pub fn write_boolean(&mut self, value: bool) {
+ let b = if value { 1u8 } else { 0u8 };
+ self.write_raw(&[b]);
+ }
+
+ pub fn write_byte(&mut self, value: u8) {
+ self.write_raw(&[value as u8]);
+ }
+
+ pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ // TODO: currently, we encoding BINARY(length) as the same with BYTES,
the length info can
+ // be omitted and the bytes length should be enforced in the future.
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ pub fn write_bytes(&mut self, value: &[u8]) {
+ let len_i32 =
+ i32::try_from(value.len()).expect("byte slice too large to encode
length as i32");
+ self.write_int(len_i32);
+ self.write_raw(value);
+ }
+
+ pub fn write_char(&mut self, value: &str, length: usize) {
+ // TODO: currently, we encoding CHAR(length) as the same with STRING,
the length info can be
+ // omitted and the bytes length should be enforced in the future.
+ self.write_string(value);
+ }
+
+ pub fn write_string(&mut self, value: &str) {
+ self.write_bytes(value.as_ref());
+ }
+
+ pub fn write_short(&mut self, value: i16) {
+ self.write_raw(&value.to_ne_bytes());
+ }
+
+ pub fn write_int(&mut self, value: i32) {
+ self.ensure_capacity(Self::MAX_INT_SIZE);
+ let mut v = value as u32;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
+ pub fn write_long(&mut self, value: i64) {
+ self.ensure_capacity(Self::MAX_LONG_SIZE);
+ let mut v = value as u64;
+ while (v & !0x7F) != 0 {
+ self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+ self.position += 1;
+ v >>= 7;
+ }
+ self.buffer[self.position] = v as u8;
+ self.position += 1;
+ }
+
+ pub fn write_float(&mut self, value: f32) {
+ self.write_raw(&value.to_ne_bytes());
+ }
+
+ pub fn write_double(&mut self, value: f64) {
+ self.write_raw(&value.to_ne_bytes());
+ }
+}
diff --git a/crates/fluss/src/row/compacted/mod.rs
b/crates/fluss/src/row/compacted/mod.rs
new file mode 100644
index 0000000..b9bc66b
--- /dev/null
+++ b/crates/fluss/src/row/compacted/mod.rs
@@ -0,0 +1 @@
+mod compacted_row_writer;
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 01b89fc..86fdf90 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -19,6 +19,8 @@ mod column;
mod datum;
+mod compacted;
+
pub use column::*;
pub use datum::*;