This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dc447d  feat: Introduce CompactedRowEncoder (#161)
7dc447d is described below

commit 7dc447dba96fe6e13c7048f0aadea87cd5e7b148
Author: Keith Lee <[email protected]>
AuthorDate: Thu Jan 15 15:45:41 2026 +0000

    feat: Introduce CompactedRowEncoder (#161)
---
 crates/fluss/src/metadata/datatype.rs              | 10 ++-
 crates/fluss/src/record/kv/kv_record_batch.rs      | 13 ++--
 .../fluss/src/record/kv/kv_record_batch_builder.rs | 51 +++++++------
 crates/fluss/src/row/compacted/compacted_row.rs    | 41 +++++++----
 .../src/row/compacted/compacted_row_reader.rs      | 18 ++++-
 .../src/row/compacted/compacted_row_writer.rs      | 59 ++++++++-------
 .../fluss/src/row/encode/compacted_row_encoder.rs  | 83 ++++++++++++++++++++++
 crates/fluss/src/row/encode/mod.rs                 | 68 +++++++++++++++++-
 crates/fluss/src/row/mod.rs                        |  4 +-
 9 files changed, 263 insertions(+), 84 deletions(-)

diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index dc1f407..f157466 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -682,11 +682,11 @@ impl Default for BytesType {
 }
 
 impl BytesType {
-    pub fn new() -> Self {
+    pub const fn new() -> Self {
         Self::with_nullable(true)
     }
 
-    pub fn with_nullable(nullable: bool) -> Self {
+    pub const fn with_nullable(nullable: bool) -> Self {
         Self { nullable }
     }
 
@@ -859,6 +859,10 @@ impl RowType {
         self.fields.iter().position(|f| f.name == field_name)
     }
 
+    pub fn field_types(&self) -> impl Iterator<Item = &DataType> + '_ {
+        self.fields.iter().map(|f| &f.data_type)
+    }
+
     pub fn get_field_names(&self) -> Vec<&str> {
         self.fields.iter().map(|f| f.name.as_str()).collect()
     }
@@ -931,7 +935,7 @@ impl DataTypes {
         DataType::Binary(BinaryType::new(length))
     }
 
-    pub fn bytes() -> DataType {
+    pub const fn bytes() -> DataType {
         DataType::Bytes(BytesType::new())
     }
 
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs 
b/crates/fluss/src/record/kv/kv_record_batch.rs
index fdd4ad7..6ead642 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -321,8 +321,10 @@ impl Iterator for KvRecordIterator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::metadata::KvFormat;
+    use crate::metadata::{DataTypes, KvFormat};
     use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
+    use crate::row::binary::BinaryWriter;
+    use crate::row::compacted::CompactedRow;
     use bytes::{BufMut, BytesMut};
 
     #[test]
@@ -363,12 +365,13 @@ mod tests {
         let key1 = b"key1";
         let mut value1_writer = CompactedRowWriter::new(1);
         value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
-        builder.append_row(key1, Some(&value1_writer)).unwrap();
+
+        let data_types = &[DataTypes::bytes()];
+        let row = &CompactedRow::from_bytes(data_types, 
value1_writer.buffer());
+        builder.append_row(key1, Some(row)).unwrap();
 
         let key2 = b"key2";
-        builder
-            .append_row::<CompactedRowWriter>(key2, None)
-            .unwrap();
+        builder.append_row::<CompactedRow>(key2, None).unwrap();
 
         let bytes = builder.build().unwrap();
 
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 773c778..7d1a797 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -317,13 +317,14 @@ impl Drop for KvRecordBatchBuilder {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::row::compacted::CompactedRowWriter;
+    use crate::metadata::{DataType, DataTypes};
+    use crate::row::binary::BinaryWriter;
+    use crate::row::compacted::{CompactedRow, CompactedRowWriter};
 
     // Helper function to create a CompactedRowWriter with a single bytes 
field for testing
-    fn create_test_row(data: &[u8]) -> CompactedRowWriter {
-        let mut writer = CompactedRowWriter::new(1);
-        writer.write_bytes(data);
-        writer
+    fn create_test_row(data: &[u8]) -> CompactedRow<'_> {
+        const DATA_TYPE: &[DataType] = &[DataTypes::bytes()];
+        CompactedRow::from_bytes(DATA_TYPE, data)
     }
 
     #[test]
@@ -349,10 +350,8 @@ mod tests {
         builder.append_row(key1, Some(&value1)).unwrap();
 
         let key2 = b"key2";
-        assert!(builder.has_room_for_row::<CompactedRowWriter>(key2, None));
-        builder
-            .append_row::<CompactedRowWriter>(key2, None)
-            .unwrap();
+        assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
+        builder.append_row::<CompactedRow>(key2, None).unwrap();
 
         // Test close and build
         builder.close().unwrap();
@@ -373,11 +372,7 @@ mod tests {
         let value = create_test_row(b"value");
         builder.append_row(b"key", Some(&value)).unwrap();
         builder.abort();
-        assert!(
-            builder
-                .append_row::<CompactedRowWriter>(b"key2", None)
-                .is_err()
-        );
+        assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
         assert!(builder.build().is_err());
         assert!(builder.close().is_err());
 
@@ -386,11 +381,7 @@ mod tests {
         let value = create_test_row(b"value");
         builder.append_row(b"key", Some(&value)).unwrap();
         builder.close().unwrap();
-        assert!(
-            builder
-                .append_row::<CompactedRowWriter>(b"key2", None)
-                .is_err()
-        ); // Can't append after close
+        assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err()); 
// Can't append after close
         assert!(builder.build().is_ok()); // But can still build
     }
 
@@ -510,23 +501,26 @@ mod tests {
         row_writer1.write_int(42);
         row_writer1.write_string("hello");
 
+        let data_types = &[DataTypes::int(), DataTypes::string()];
+        let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer());
+
         let key1 = b"key1";
-        assert!(builder.has_room_for_row(key1, Some(&row_writer1)));
-        builder.append_row(key1, Some(&row_writer1)).unwrap();
+        assert!(builder.has_room_for_row(key1, Some(row1)));
+        builder.append_row(key1, Some(row1)).unwrap();
 
         // Create and append second record
         let mut row_writer2 = CompactedRowWriter::new(2);
         row_writer2.write_int(100);
         row_writer2.write_string("world");
 
+        let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer());
+
         let key2 = b"key2";
-        builder.append_row(key2, Some(&row_writer2)).unwrap();
+        builder.append_row(key2, Some(row2)).unwrap();
 
         // Append a deletion record
         let key3 = b"key3";
-        builder
-            .append_row::<CompactedRowWriter>(key3, None)
-            .unwrap();
+        builder.append_row::<CompactedRow>(key3, None).unwrap();
 
         // Build and verify
         builder.close().unwrap();
@@ -567,15 +561,18 @@ mod tests {
         let mut row_writer = CompactedRowWriter::new(1);
         row_writer.write_int(42);
 
+        let data_types = &[DataTypes::int()];
+        let row = &CompactedRow::from_bytes(data_types, row_writer.buffer());
+
         // INDEXED format should reject append_row
         let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::INDEXED);
-        let result = indexed_builder.append_row(b"key", Some(&row_writer));
+        let result = indexed_builder.append_row(b"key", Some(row));
         assert!(result.is_err());
         assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
 
         // COMPACTED format should accept append_row
         let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
-        let result = compacted_builder.append_row(b"key", Some(&row_writer));
+        let result = compacted_builder.append_row(b"key", Some(row));
         assert!(result.is_ok());
     }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index 481f9be..9ff3b5f 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::OnceLock;
-
 use crate::metadata::DataType;
 use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, 
CompactedRowReader};
-use crate::row::{GenericRow, InternalRow};
+use crate::row::{BinaryRow, GenericRow, InternalRow};
+use std::sync::{Arc, OnceLock};
 
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
@@ -28,9 +27,9 @@ pub struct CompactedRow<'a> {
     arity: usize,
     size_in_bytes: usize,
     decoded_row: OnceLock<GenericRow<'a>>,
-    deserializer: CompactedRowDeserializer<'a>,
+    deserializer: Arc<CompactedRowDeserializer<'a>>,
     reader: CompactedRowReader<'a>,
-    data_types: &'a [DataType],
+    data: &'a [u8],
 }
 
 pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
@@ -40,15 +39,25 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> 
usize {
 #[allow(dead_code)]
 impl<'a> CompactedRow<'a> {
     pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
-        let arity = data_types.len();
-        let size = data.len();
+        Self::deserialize(
+            Arc::new(CompactedRowDeserializer::new(data_types)),
+            data_types.len(),
+            data,
+        )
+    }
+
+    pub fn deserialize(
+        deserializer: Arc<CompactedRowDeserializer<'a>>,
+        arity: usize,
+        data: &'a [u8],
+    ) -> Self {
         Self {
             arity,
-            size_in_bytes: size,
+            size_in_bytes: data.len(),
             decoded_row: OnceLock::new(),
-            deserializer: CompactedRowDeserializer::new(data_types),
-            reader: CompactedRowReader::new(arity, data, 0, size),
-            data_types,
+            deserializer: Arc::clone(&deserializer),
+            reader: CompactedRowReader::new(arity, data, 0, data.len()),
+            data,
         }
     }
 
@@ -62,6 +71,12 @@ impl<'a> CompactedRow<'a> {
     }
 }
 
+impl BinaryRow for CompactedRow<'_> {
+    fn as_bytes(&self) -> &[u8] {
+        self.data
+    }
+}
+
 #[allow(dead_code)]
 impl<'a> InternalRow for CompactedRow<'a> {
     fn get_field_count(&self) -> usize {
@@ -69,7 +84,7 @@ impl<'a> InternalRow for CompactedRow<'a> {
     }
 
     fn is_null_at(&self, pos: usize) -> bool {
-        self.data_types[pos].is_nullable() && self.reader.is_null_at(pos)
+        self.deserializer.get_data_types()[pos].is_nullable() && 
self.reader.is_null_at(pos)
     }
 
     fn get_boolean(&self, pos: usize) -> bool {
@@ -120,6 +135,8 @@ impl<'a> InternalRow for CompactedRow<'a> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::row::binary::BinaryWriter;
+
     use crate::metadata::{
         BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType, 
SmallIntType,
         StringType, TinyIntType,
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 5ec2608..9ce5095 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -21,17 +21,31 @@ use crate::{
     row::{Datum, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
     util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
 };
+use std::borrow::Cow;
 use std::str::from_utf8;
 
 #[allow(dead_code)]
+#[derive(Clone)]
 pub struct CompactedRowDeserializer<'a> {
-    schema: &'a [DataType],
+    schema: Cow<'a, [DataType]>,
 }
 
 #[allow(dead_code)]
 impl<'a> CompactedRowDeserializer<'a> {
     pub fn new(schema: &'a [DataType]) -> Self {
-        Self { schema }
+        Self {
+            schema: Cow::Borrowed(schema),
+        }
+    }
+
+    pub fn new_from_owned(schema: Vec<DataType>) -> Self {
+        Self {
+            schema: Cow::Owned(schema),
+        }
+    }
+
+    pub fn get_data_types(&self) -> &[DataType] {
+        self.schema.as_ref()
     }
 
     pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
GenericRow<'a> {
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 63b32a3..c130e94 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::{Bytes, BytesMut};
-use std::cmp;
-
-use crate::row::BinaryRow;
+use crate::row::binary::BinaryWriter;
 use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::util::varint::{write_unsigned_varint_to_slice, 
write_unsigned_varint_u64_to_slice};
+use bytes::{Bytes, BytesMut};
+use std::cmp;
 
 // Writer for CompactedRow
 // Reference implementation:
@@ -51,11 +50,6 @@ impl CompactedRowWriter {
         }
     }
 
-    pub fn reset(&mut self) {
-        self.position = self.header_size_in_bytes;
-        self.buffer[..self.header_size_in_bytes].fill(0);
-    }
-
     pub fn position(&self) -> usize {
         self.position
     }
@@ -81,75 +75,78 @@ impl CompactedRowWriter {
         self.buffer[self.position..end].copy_from_slice(src);
         self.position = end;
     }
+}
+impl BinaryWriter for CompactedRowWriter {
+    fn reset(&mut self) {
+        self.position = self.header_size_in_bytes;
+        self.buffer[..self.header_size_in_bytes].fill(0);
+    }
 
-    pub fn set_null_at(&mut self, pos: usize) {
+    fn set_null_at(&mut self, pos: usize) {
         let byte_index = pos >> 3;
         let bit = pos & 7;
         debug_assert!(byte_index < self.header_size_in_bytes);
         self.buffer[byte_index] |= 1u8 << bit;
     }
 
-    pub fn write_boolean(&mut self, value: bool) {
+    fn write_boolean(&mut self, value: bool) {
         let b = if value { 1u8 } else { 0u8 };
         self.write_raw(&[b]);
     }
 
-    pub fn write_byte(&mut self, value: u8) {
+    fn write_byte(&mut self, value: u8) {
         self.write_raw(&[value]);
     }
 
-    pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
-        // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
-        //  be omitted and the bytes length should be enforced in the future.
-        self.write_bytes(&bytes[..length.min(bytes.len())]);
-    }
-
-    pub fn write_bytes(&mut self, value: &[u8]) {
+    fn write_bytes(&mut self, value: &[u8]) {
         let len_i32 =
             i32::try_from(value.len()).expect("byte slice too large to encode 
length as i32");
         self.write_int(len_i32);
         self.write_raw(value);
     }
 
-    pub fn write_char(&mut self, value: &str, _length: usize) {
+    fn write_char(&mut self, value: &str, _length: usize) {
         // TODO: currently, we encoding CHAR(length) as the same with STRING, 
the length info can be
         //  omitted and the bytes length should be enforced in the future.
         self.write_string(value);
     }
 
-    pub fn write_string(&mut self, value: &str) {
+    fn write_string(&mut self, value: &str) {
         self.write_bytes(value.as_ref());
     }
 
-    pub fn write_short(&mut self, value: i16) {
+    fn write_short(&mut self, value: i16) {
         self.write_raw(&value.to_ne_bytes());
     }
 
-    pub fn write_int(&mut self, value: i32) {
+    fn write_int(&mut self, value: i32) {
         self.ensure_capacity(Self::MAX_INT_SIZE);
         let bytes_written =
             write_unsigned_varint_to_slice(value as u32, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
     }
 
-    pub fn write_long(&mut self, value: i64) {
+    fn write_long(&mut self, value: i64) {
         self.ensure_capacity(Self::MAX_LONG_SIZE);
         let bytes_written =
             write_unsigned_varint_u64_to_slice(value as u64, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
     }
-
-    pub fn write_float(&mut self, value: f32) {
+    fn write_float(&mut self, value: f32) {
         self.write_raw(&value.to_ne_bytes());
     }
 
-    pub fn write_double(&mut self, value: f64) {
+    fn write_double(&mut self, value: f64) {
         self.write_raw(&value.to_ne_bytes());
     }
-}
 
-impl BinaryRow for CompactedRowWriter {
-    fn as_bytes(&self) -> &[u8] {
-        self.buffer()
+    fn write_binary(&mut self, bytes: &[u8], length: usize) {
+        // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
+        //  be omitted and the bytes length should be enforced in the future.
+        self.write_bytes(&bytes[..length.min(bytes.len())]);
+    }
+
+    fn complete(&mut self) {
+        // do nothing
     }
 }
diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs 
b/crates/fluss/src/row/encode/compacted_row_encoder.rs
new file mode 100644
index 0000000..fc39bb7
--- /dev/null
+++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::Datum;
+use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
+use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, 
CompactedRowWriter};
+use crate::row::encode::{BinaryRow, RowEncoder};
+use std::sync::Arc;
+
+#[allow(dead_code)]
+pub struct CompactedRowEncoder<'a> {
+    arity: usize,
+    writer: CompactedRowWriter,
+    field_writers: Vec<ValueWriter>,
+    compacted_row_deserializer: Arc<CompactedRowDeserializer<'a>>,
+}
+
+impl<'a> CompactedRowEncoder<'a> {
+    pub fn new(field_data_types: Vec<DataType>) -> Result<Self> {
+        let field_writers = field_data_types
+            .iter()
+            .map(|d| ValueWriter::create_value_writer(d, 
Some(&BinaryRowFormat::Compacted)))
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Self {
+            arity: field_data_types.len(),
+            writer: CompactedRowWriter::new(field_data_types.len()),
+            field_writers,
+            compacted_row_deserializer: 
Arc::new(CompactedRowDeserializer::new_from_owned(
+                field_data_types,
+            )),
+        })
+    }
+}
+
+impl RowEncoder for CompactedRowEncoder<'_> {
+    fn start_new_row(&mut self) -> Result<()> {
+        self.writer.reset();
+        Ok(())
+    }
+
+    fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()> {
+        self.field_writers
+            .get(pos)
+            .ok_or_else(|| IllegalArgument {
+                message: format!(
+                    "invalid position {} when attempting to encode value {}",
+                    pos, value
+                ),
+            })?
+            .write_value(&mut self.writer, pos, &value)
+    }
+
+    fn finish_row(&mut self) -> Result<impl BinaryRow> {
+        Ok(CompactedRow::deserialize(
+            Arc::clone(&self.compacted_row_deserializer),
+            self.arity,
+            self.writer.buffer(),
+        ))
+    }
+
+    fn close(&mut self) -> Result<()> {
+        // do nothing
+        Ok(())
+    }
+}
diff --git a/crates/fluss/src/row/encode/mod.rs 
b/crates/fluss/src/row/encode/mod.rs
index 6c6eed9..34863ab 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -16,11 +16,13 @@
 // under the License.
 
 mod compacted_key_encoder;
+mod compacted_row_encoder;
 
 use crate::error::Result;
-use crate::metadata::{DataLakeFormat, RowType};
-use crate::row::InternalRow;
+use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType};
 use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
+use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
+use crate::row::{BinaryRow, Datum, InternalRow};
 use bytes::Bytes;
 
 /// An interface for encoding key of row into bytes.
@@ -62,3 +64,65 @@ impl dyn KeyEncoder {
         }
     }
 }
+
+/// An encoder to write [`BinaryRow`]. It's used to write row
+/// multi-times one by one. When writing a new row:
+///
+/// 1. call method [`RowEncoder::start_new_row()`] to start the writing.
+/// 2. call method [`RowEncoder::encode_field()`] to write the row's field.
+/// 3. call method [`RowEncoder::finishRow()`] to finish the writing and get 
the written row.
+#[allow(dead_code)]
+pub trait RowEncoder {
+    /// Start to write a new row.
+    ///
+    /// # Returns
+    /// * Ok(()) if successful
+    fn start_new_row(&mut self) -> Result<()>;
+
+    /// Write the row's field in given pos with given value.
+    ///
+    /// # Arguments
+    /// * pos - the position of the field to write.
+    /// * value - the value of the field to write.
+    ///
+    /// # Returns
+    /// * Ok(()) if successful
+    fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()>;
+
+    /// Finish write the row, returns the written row.
+    ///
+    /// Note that returned row borrows from [`RowEncoder`]'s internal buffer 
which is reused for subsequent rows
+    /// [`RowEncoder::start_new_row()`] should only be called after the 
returned row goes out of scope.
+    ///
+    /// # Returns
+    /// * the written row
+    fn finish_row(&mut self) -> Result<impl BinaryRow>;
+
+    /// Closes the row encoder
+    ///
+    /// # Returns
+    /// * Ok(()) if successful
+    fn close(&mut self) -> Result<()>;
+}
+
+#[allow(dead_code)]
+pub struct RowEncoderFactory {}
+
+#[allow(dead_code)]
+impl RowEncoderFactory {
+    pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result<impl 
RowEncoder> {
+        Self::create_for_field_types(kv_format, 
row_type.field_types().cloned().collect())
+    }
+
+    pub fn create_for_field_types(
+        kv_format: KvFormat,
+        field_data_types: Vec<DataType>,
+    ) -> Result<impl RowEncoder> {
+        match kv_format {
+            KvFormat::INDEXED => {
+                todo!()
+            }
+            KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types),
+        }
+    }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 144d64f..4996063 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -19,7 +19,7 @@ mod column;
 
 mod datum;
 
-mod binary;
+pub mod binary;
 pub mod compacted;
 mod encode;
 mod field_getter;
@@ -27,7 +27,7 @@ mod field_getter;
 pub use column::*;
 pub use datum::*;
 
-pub trait BinaryRow {
+pub trait BinaryRow: InternalRow {
     /// Returns the binary representation of this row as a byte slice.
     fn as_bytes(&self) -> &[u8];
 }

Reply via email to