This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bddce62  feat: implement CompactedRowWriter (#121)
bddce62 is described below

commit bddce626096ca8d88f712cf5f41eaff107433c5b
Author: Kelvin Wu <[email protected]>
AuthorDate: Sat Jan 3 14:45:54 2026 +0800

    feat: implement CompactedRowWriter (#121)
---
 .../src/row/compacted/compacted_row_writer.rs      | 153 +++++++++++++++++++++
 crates/fluss/src/row/compacted/mod.rs              |   1 +
 crates/fluss/src/row/mod.rs                        |   2 +
 3 files changed, 156 insertions(+)

diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
new file mode 100644
index 0000000..7c0adde
--- /dev/null
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::cmp;
+
+// Writer for CompactedRow
+// Reference implementation:
+// 
https://github.com/apache/fluss/blob/d4a72fad240d4b81563aaf83fa3b09b5058674ed/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java#L71
+pub struct CompactedRowWriter {
+    header_size_in_bytes: usize,
+    position: usize,
+    buffer: BytesMut,
+}
+
+impl CompactedRowWriter {
+    pub const MAX_INT_SIZE: usize = 5;
+    pub const MAX_LONG_SIZE: usize = 10;
+
+    pub fn new(field_count: usize) -> Self {
+        // bitset width in bytes, it should be in CompactedRow
+        let header_size = (field_count + 7) / 8;
+        let cap = cmp::max(64, header_size);
+
+        let mut buffer = BytesMut::with_capacity(cap);
+        buffer.resize(cap, 0);
+
+        Self {
+            header_size_in_bytes: header_size,
+            position: header_size,
+            buffer,
+        }
+    }
+
+    pub fn reset(&mut self) {
+        self.position = self.header_size_in_bytes;
+        self.buffer[..self.header_size_in_bytes].fill(0);
+    }
+
+    pub fn position(&self) -> usize {
+        self.position
+    }
+
+    pub fn buffer(&self) -> &[u8] {
+        &self.buffer[..self.position]
+    }
+
+    pub fn to_bytes(&self) -> Bytes {
+        Bytes::copy_from_slice(&self.buffer[..self.position])
+    }
+
+    fn ensure_capacity(&mut self, need_len: usize) {
+        if (self.buffer.len() - self.position) < need_len {
+            let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + 
need_len);
+            self.buffer.resize(new_len, 0);
+        }
+    }
+
+    fn write_raw(&mut self, src: &[u8]) {
+        let end = self.position + src.len();
+        self.ensure_capacity(src.len());
+        self.buffer[self.position..end].copy_from_slice(src);
+        self.position = end;
+    }
+
+    pub fn set_null_at(&mut self, pos: usize) {
+        let byte_index = pos >> 3;
+        let bit = pos & 7;
+        debug_assert!(byte_index < self.header_size_in_bytes);
+        self.buffer[byte_index] |= 1u8 << bit;
+    }
+
+    pub fn write_boolean(&mut self, value: bool) {
+        let b = if value { 1u8 } else { 0u8 };
+        self.write_raw(&[b]);
+    }
+
+    pub fn write_byte(&mut self, value: u8) {
+        self.write_raw(&[value as u8]);
+    }
+
+    pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
+        // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
+        //  be omitted and the bytes length should be enforced in the future.
+        self.write_bytes(&bytes[..length.min(bytes.len())]);
+    }
+
+    pub fn write_bytes(&mut self, value: &[u8]) {
+        let len_i32 =
+            i32::try_from(value.len()).expect("byte slice too large to encode 
length as i32");
+        self.write_int(len_i32);
+        self.write_raw(value);
+    }
+
+    pub fn write_char(&mut self, value: &str, length: usize) {
+        // TODO: currently, we encoding CHAR(length) as the same with STRING, 
the length info can be
+        //  omitted and the bytes length should be enforced in the future.
+        self.write_string(value);
+    }
+
+    pub fn write_string(&mut self, value: &str) {
+        self.write_bytes(value.as_ref());
+    }
+
+    pub fn write_short(&mut self, value: i16) {
+        self.write_raw(&value.to_ne_bytes());
+    }
+
+    pub fn write_int(&mut self, value: i32) {
+        self.ensure_capacity(Self::MAX_INT_SIZE);
+        let mut v = value as u32;
+        while (v & !0x7F) != 0 {
+            self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+            self.position += 1;
+            v >>= 7;
+        }
+        self.buffer[self.position] = v as u8;
+        self.position += 1;
+    }
+    pub fn write_long(&mut self, value: i64) {
+        self.ensure_capacity(Self::MAX_LONG_SIZE);
+        let mut v = value as u64;
+        while (v & !0x7F) != 0 {
+            self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
+            self.position += 1;
+            v >>= 7;
+        }
+        self.buffer[self.position] = v as u8;
+        self.position += 1;
+    }
+
+    pub fn write_float(&mut self, value: f32) {
+        self.write_raw(&value.to_ne_bytes());
+    }
+
+    pub fn write_double(&mut self, value: f64) {
+        self.write_raw(&value.to_ne_bytes());
+    }
+}
diff --git a/crates/fluss/src/row/compacted/mod.rs 
b/crates/fluss/src/row/compacted/mod.rs
new file mode 100644
index 0000000..b9bc66b
--- /dev/null
+++ b/crates/fluss/src/row/compacted/mod.rs
@@ -0,0 +1 @@
+mod compacted_row_writer;
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 01b89fc..86fdf90 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -19,6 +19,8 @@ mod column;
 
 mod datum;
 
+mod compacted;
+
 pub use column::*;
 pub use datum::*;
 

Reply via email to