This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7dc447d feat: Introduce CompactedRowEncoder (#161)
7dc447d is described below
commit 7dc447dba96fe6e13c7048f0aadea87cd5e7b148
Author: Keith Lee <[email protected]>
AuthorDate: Thu Jan 15 15:45:41 2026 +0000
feat: Introduce CompactedRowEncoder (#161)
---
crates/fluss/src/metadata/datatype.rs | 10 ++-
crates/fluss/src/record/kv/kv_record_batch.rs | 13 ++--
.../fluss/src/record/kv/kv_record_batch_builder.rs | 51 +++++++------
crates/fluss/src/row/compacted/compacted_row.rs | 41 +++++++----
.../src/row/compacted/compacted_row_reader.rs | 18 ++++-
.../src/row/compacted/compacted_row_writer.rs | 59 ++++++++-------
.../fluss/src/row/encode/compacted_row_encoder.rs | 83 ++++++++++++++++++++++
crates/fluss/src/row/encode/mod.rs | 68 +++++++++++++++++-
crates/fluss/src/row/mod.rs | 4 +-
9 files changed, 263 insertions(+), 84 deletions(-)
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index dc1f407..f157466 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -682,11 +682,11 @@ impl Default for BytesType {
}
impl BytesType {
- pub fn new() -> Self {
+ pub const fn new() -> Self {
Self::with_nullable(true)
}
- pub fn with_nullable(nullable: bool) -> Self {
+ pub const fn with_nullable(nullable: bool) -> Self {
Self { nullable }
}
@@ -859,6 +859,10 @@ impl RowType {
self.fields.iter().position(|f| f.name == field_name)
}
+ pub fn field_types(&self) -> impl Iterator<Item = &DataType> + '_ {
+ self.fields.iter().map(|f| &f.data_type)
+ }
+
pub fn get_field_names(&self) -> Vec<&str> {
self.fields.iter().map(|f| f.name.as_str()).collect()
}
@@ -931,7 +935,7 @@ impl DataTypes {
DataType::Binary(BinaryType::new(length))
}
- pub fn bytes() -> DataType {
+ pub const fn bytes() -> DataType {
DataType::Bytes(BytesType::new())
}
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs
b/crates/fluss/src/record/kv/kv_record_batch.rs
index fdd4ad7..6ead642 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -321,8 +321,10 @@ impl Iterator for KvRecordIterator {
#[cfg(test)]
mod tests {
use super::*;
- use crate::metadata::KvFormat;
+ use crate::metadata::{DataTypes, KvFormat};
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
+ use crate::row::binary::BinaryWriter;
+ use crate::row::compacted::CompactedRow;
use bytes::{BufMut, BytesMut};
#[test]
@@ -363,12 +365,13 @@ mod tests {
let key1 = b"key1";
let mut value1_writer = CompactedRowWriter::new(1);
value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
- builder.append_row(key1, Some(&value1_writer)).unwrap();
+
+ let data_types = &[DataTypes::bytes()];
+ let row = &CompactedRow::from_bytes(data_types,
value1_writer.buffer());
+ builder.append_row(key1, Some(row)).unwrap();
let key2 = b"key2";
- builder
- .append_row::<CompactedRowWriter>(key2, None)
- .unwrap();
+ builder.append_row::<CompactedRow>(key2, None).unwrap();
let bytes = builder.build().unwrap();
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 773c778..7d1a797 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -317,13 +317,14 @@ impl Drop for KvRecordBatchBuilder {
#[cfg(test)]
mod tests {
use super::*;
- use crate::row::compacted::CompactedRowWriter;
+ use crate::metadata::{DataType, DataTypes};
+ use crate::row::binary::BinaryWriter;
+ use crate::row::compacted::{CompactedRow, CompactedRowWriter};
// Helper function to create a CompactedRowWriter with a single bytes
field for testing
- fn create_test_row(data: &[u8]) -> CompactedRowWriter {
- let mut writer = CompactedRowWriter::new(1);
- writer.write_bytes(data);
- writer
+ fn create_test_row(data: &[u8]) -> CompactedRow<'_> {
+ const DATA_TYPE: &[DataType] = &[DataTypes::bytes()];
+ CompactedRow::from_bytes(DATA_TYPE, data)
}
#[test]
@@ -349,10 +350,8 @@ mod tests {
builder.append_row(key1, Some(&value1)).unwrap();
let key2 = b"key2";
- assert!(builder.has_room_for_row::<CompactedRowWriter>(key2, None));
- builder
- .append_row::<CompactedRowWriter>(key2, None)
- .unwrap();
+ assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
+ builder.append_row::<CompactedRow>(key2, None).unwrap();
// Test close and build
builder.close().unwrap();
@@ -373,11 +372,7 @@ mod tests {
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
builder.abort();
- assert!(
- builder
- .append_row::<CompactedRowWriter>(b"key2", None)
- .is_err()
- );
+ assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
assert!(builder.build().is_err());
assert!(builder.close().is_err());
@@ -386,11 +381,7 @@ mod tests {
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
builder.close().unwrap();
- assert!(
- builder
- .append_row::<CompactedRowWriter>(b"key2", None)
- .is_err()
- ); // Can't append after close
+ assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
// Can't append after close
assert!(builder.build().is_ok()); // But can still build
}
@@ -510,23 +501,26 @@ mod tests {
row_writer1.write_int(42);
row_writer1.write_string("hello");
+ let data_types = &[DataTypes::int(), DataTypes::string()];
+ let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer());
+
let key1 = b"key1";
- assert!(builder.has_room_for_row(key1, Some(&row_writer1)));
- builder.append_row(key1, Some(&row_writer1)).unwrap();
+ assert!(builder.has_room_for_row(key1, Some(row1)));
+ builder.append_row(key1, Some(row1)).unwrap();
// Create and append second record
let mut row_writer2 = CompactedRowWriter::new(2);
row_writer2.write_int(100);
row_writer2.write_string("world");
+ let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer());
+
let key2 = b"key2";
- builder.append_row(key2, Some(&row_writer2)).unwrap();
+ builder.append_row(key2, Some(row2)).unwrap();
// Append a deletion record
let key3 = b"key3";
- builder
- .append_row::<CompactedRowWriter>(key3, None)
- .unwrap();
+ builder.append_row::<CompactedRow>(key3, None).unwrap();
// Build and verify
builder.close().unwrap();
@@ -567,15 +561,18 @@ mod tests {
let mut row_writer = CompactedRowWriter::new(1);
row_writer.write_int(42);
+ let data_types = &[DataTypes::int()];
+ let row = &CompactedRow::from_bytes(data_types, row_writer.buffer());
+
// INDEXED format should reject append_row
let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::INDEXED);
- let result = indexed_builder.append_row(b"key", Some(&row_writer));
+ let result = indexed_builder.append_row(b"key", Some(row));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
// COMPACTED format should accept append_row
let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
- let result = compacted_builder.append_row(b"key", Some(&row_writer));
+ let result = compacted_builder.append_row(b"key", Some(row));
assert!(result.is_ok());
}
}
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index 481f9be..9ff3b5f 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::OnceLock;
-
use crate::metadata::DataType;
use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer,
CompactedRowReader};
-use crate::row::{GenericRow, InternalRow};
+use crate::row::{BinaryRow, GenericRow, InternalRow};
+use std::sync::{Arc, OnceLock};
// Reference implementation:
//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
@@ -28,9 +27,9 @@ pub struct CompactedRow<'a> {
arity: usize,
size_in_bytes: usize,
decoded_row: OnceLock<GenericRow<'a>>,
- deserializer: CompactedRowDeserializer<'a>,
+ deserializer: Arc<CompactedRowDeserializer<'a>>,
reader: CompactedRowReader<'a>,
- data_types: &'a [DataType],
+ data: &'a [u8],
}
pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
@@ -40,15 +39,25 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) ->
usize {
#[allow(dead_code)]
impl<'a> CompactedRow<'a> {
pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
- let arity = data_types.len();
- let size = data.len();
+ Self::deserialize(
+ Arc::new(CompactedRowDeserializer::new(data_types)),
+ data_types.len(),
+ data,
+ )
+ }
+
+ pub fn deserialize(
+ deserializer: Arc<CompactedRowDeserializer<'a>>,
+ arity: usize,
+ data: &'a [u8],
+ ) -> Self {
Self {
arity,
- size_in_bytes: size,
+ size_in_bytes: data.len(),
decoded_row: OnceLock::new(),
- deserializer: CompactedRowDeserializer::new(data_types),
- reader: CompactedRowReader::new(arity, data, 0, size),
- data_types,
+ deserializer: Arc::clone(&deserializer),
+ reader: CompactedRowReader::new(arity, data, 0, data.len()),
+ data,
}
}
@@ -62,6 +71,12 @@ impl<'a> CompactedRow<'a> {
}
}
+impl BinaryRow for CompactedRow<'_> {
+ fn as_bytes(&self) -> &[u8] {
+ self.data
+ }
+}
+
#[allow(dead_code)]
impl<'a> InternalRow for CompactedRow<'a> {
fn get_field_count(&self) -> usize {
@@ -69,7 +84,7 @@ impl<'a> InternalRow for CompactedRow<'a> {
}
fn is_null_at(&self, pos: usize) -> bool {
- self.data_types[pos].is_nullable() && self.reader.is_null_at(pos)
+ self.deserializer.get_data_types()[pos].is_nullable() &&
self.reader.is_null_at(pos)
}
fn get_boolean(&self, pos: usize) -> bool {
@@ -120,6 +135,8 @@ impl<'a> InternalRow for CompactedRow<'a> {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::row::binary::BinaryWriter;
+
use crate::metadata::{
BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType,
SmallIntType,
StringType, TinyIntType,
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 5ec2608..9ce5095 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -21,17 +21,31 @@ use crate::{
row::{Datum, GenericRow,
compacted::compacted_row_writer::CompactedRowWriter},
util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
};
+use std::borrow::Cow;
use std::str::from_utf8;
#[allow(dead_code)]
+#[derive(Clone)]
pub struct CompactedRowDeserializer<'a> {
- schema: &'a [DataType],
+ schema: Cow<'a, [DataType]>,
}
#[allow(dead_code)]
impl<'a> CompactedRowDeserializer<'a> {
pub fn new(schema: &'a [DataType]) -> Self {
- Self { schema }
+ Self {
+ schema: Cow::Borrowed(schema),
+ }
+ }
+
+ pub fn new_from_owned(schema: Vec<DataType>) -> Self {
+ Self {
+ schema: Cow::Owned(schema),
+ }
+ }
+
+ pub fn get_data_types(&self) -> &[DataType] {
+ self.schema.as_ref()
}
pub fn deserialize(&self, reader: &CompactedRowReader<'a>) ->
GenericRow<'a> {
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 63b32a3..c130e94 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use bytes::{Bytes, BytesMut};
-use std::cmp;
-
-use crate::row::BinaryRow;
+use crate::row::binary::BinaryWriter;
use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
use crate::util::varint::{write_unsigned_varint_to_slice,
write_unsigned_varint_u64_to_slice};
+use bytes::{Bytes, BytesMut};
+use std::cmp;
// Writer for CompactedRow
// Reference implementation:
@@ -51,11 +50,6 @@ impl CompactedRowWriter {
}
}
- pub fn reset(&mut self) {
- self.position = self.header_size_in_bytes;
- self.buffer[..self.header_size_in_bytes].fill(0);
- }
-
pub fn position(&self) -> usize {
self.position
}
@@ -81,75 +75,78 @@ impl CompactedRowWriter {
self.buffer[self.position..end].copy_from_slice(src);
self.position = end;
}
+}
+impl BinaryWriter for CompactedRowWriter {
+ fn reset(&mut self) {
+ self.position = self.header_size_in_bytes;
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
- pub fn set_null_at(&mut self, pos: usize) {
+ fn set_null_at(&mut self, pos: usize) {
let byte_index = pos >> 3;
let bit = pos & 7;
debug_assert!(byte_index < self.header_size_in_bytes);
self.buffer[byte_index] |= 1u8 << bit;
}
- pub fn write_boolean(&mut self, value: bool) {
+ fn write_boolean(&mut self, value: bool) {
let b = if value { 1u8 } else { 0u8 };
self.write_raw(&[b]);
}
- pub fn write_byte(&mut self, value: u8) {
+ fn write_byte(&mut self, value: u8) {
self.write_raw(&[value]);
}
- pub fn write_binary(&mut self, bytes: &[u8], length: usize) {
- // TODO: currently, we encoding BINARY(length) as the same with BYTES,
the length info can
- // be omitted and the bytes length should be enforced in the future.
- self.write_bytes(&bytes[..length.min(bytes.len())]);
- }
-
- pub fn write_bytes(&mut self, value: &[u8]) {
+ fn write_bytes(&mut self, value: &[u8]) {
let len_i32 =
i32::try_from(value.len()).expect("byte slice too large to encode
length as i32");
self.write_int(len_i32);
self.write_raw(value);
}
- pub fn write_char(&mut self, value: &str, _length: usize) {
+ fn write_char(&mut self, value: &str, _length: usize) {
// TODO: currently, we encoding CHAR(length) as the same with STRING,
the length info can be
// omitted and the bytes length should be enforced in the future.
self.write_string(value);
}
- pub fn write_string(&mut self, value: &str) {
+ fn write_string(&mut self, value: &str) {
self.write_bytes(value.as_ref());
}
- pub fn write_short(&mut self, value: i16) {
+ fn write_short(&mut self, value: i16) {
self.write_raw(&value.to_ne_bytes());
}
- pub fn write_int(&mut self, value: i32) {
+ fn write_int(&mut self, value: i32) {
self.ensure_capacity(Self::MAX_INT_SIZE);
let bytes_written =
write_unsigned_varint_to_slice(value as u32, &mut
self.buffer[self.position..]);
self.position += bytes_written;
}
- pub fn write_long(&mut self, value: i64) {
+ fn write_long(&mut self, value: i64) {
self.ensure_capacity(Self::MAX_LONG_SIZE);
let bytes_written =
write_unsigned_varint_u64_to_slice(value as u64, &mut
self.buffer[self.position..]);
self.position += bytes_written;
}
-
- pub fn write_float(&mut self, value: f32) {
+ fn write_float(&mut self, value: f32) {
self.write_raw(&value.to_ne_bytes());
}
- pub fn write_double(&mut self, value: f64) {
+ fn write_double(&mut self, value: f64) {
self.write_raw(&value.to_ne_bytes());
}
-}
-impl BinaryRow for CompactedRowWriter {
- fn as_bytes(&self) -> &[u8] {
- self.buffer()
+ fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ // TODO: currently, we encoding BINARY(length) as the same with BYTES,
the length info can
+ // be omitted and the bytes length should be enforced in the future.
+ self.write_bytes(&bytes[..length.min(bytes.len())]);
+ }
+
+ fn complete(&mut self) {
+ // do nothing
}
}
diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs
b/crates/fluss/src/row/encode/compacted_row_encoder.rs
new file mode 100644
index 0000000..fc39bb7
--- /dev/null
+++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::Datum;
+use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
+use crate::row::compacted::{CompactedRow, CompactedRowDeserializer,
CompactedRowWriter};
+use crate::row::encode::{BinaryRow, RowEncoder};
+use std::sync::Arc;
+
+#[allow(dead_code)]
+pub struct CompactedRowEncoder<'a> {
+ arity: usize,
+ writer: CompactedRowWriter,
+ field_writers: Vec<ValueWriter>,
+ compacted_row_deserializer: Arc<CompactedRowDeserializer<'a>>,
+}
+
+impl<'a> CompactedRowEncoder<'a> {
+ pub fn new(field_data_types: Vec<DataType>) -> Result<Self> {
+ let field_writers = field_data_types
+ .iter()
+ .map(|d| ValueWriter::create_value_writer(d,
Some(&BinaryRowFormat::Compacted)))
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Self {
+ arity: field_data_types.len(),
+ writer: CompactedRowWriter::new(field_data_types.len()),
+ field_writers,
+ compacted_row_deserializer:
Arc::new(CompactedRowDeserializer::new_from_owned(
+ field_data_types,
+ )),
+ })
+ }
+}
+
+impl RowEncoder for CompactedRowEncoder<'_> {
+ fn start_new_row(&mut self) -> Result<()> {
+ self.writer.reset();
+ Ok(())
+ }
+
+ fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()> {
+ self.field_writers
+ .get(pos)
+ .ok_or_else(|| IllegalArgument {
+ message: format!(
+ "invalid position {} when attempting to encode value {}",
+ pos, value
+ ),
+ })?
+ .write_value(&mut self.writer, pos, &value)
+ }
+
+ fn finish_row(&mut self) -> Result<impl BinaryRow> {
+ Ok(CompactedRow::deserialize(
+ Arc::clone(&self.compacted_row_deserializer),
+ self.arity,
+ self.writer.buffer(),
+ ))
+ }
+
+ fn close(&mut self) -> Result<()> {
+ // do nothing
+ Ok(())
+ }
+}
diff --git a/crates/fluss/src/row/encode/mod.rs
b/crates/fluss/src/row/encode/mod.rs
index 6c6eed9..34863ab 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -16,11 +16,13 @@
// under the License.
mod compacted_key_encoder;
+mod compacted_row_encoder;
use crate::error::Result;
-use crate::metadata::{DataLakeFormat, RowType};
-use crate::row::InternalRow;
+use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType};
use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
+use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
+use crate::row::{BinaryRow, Datum, InternalRow};
use bytes::Bytes;
/// An interface for encoding key of row into bytes.
@@ -62,3 +64,65 @@ impl dyn KeyEncoder {
}
}
}
+
+/// An encoder to write [`BinaryRow`]. It's used to write row
+/// multi-times one by one. When writing a new row:
+///
+/// 1. call method [`RowEncoder::start_new_row()`] to start the writing.
+/// 2. call method [`RowEncoder::encode_field()`] to write the row's field.
+/// 3. call method [`RowEncoder::finishRow()`] to finish the writing and get
the written row.
+#[allow(dead_code)]
+pub trait RowEncoder {
+ /// Start to write a new row.
+ ///
+ /// # Returns
+ /// * Ok(()) if successful
+ fn start_new_row(&mut self) -> Result<()>;
+
+ /// Write the row's field in given pos with given value.
+ ///
+ /// # Arguments
+ /// * pos - the position of the field to write.
+ /// * value - the value of the field to write.
+ ///
+ /// # Returns
+ /// * Ok(()) if successful
+ fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()>;
+
+ /// Finish write the row, returns the written row.
+ ///
+ /// Note that returned row borrows from [`RowEncoder`]'s internal buffer
which is reused for subsequent rows
+ /// [`RowEncoder::start_new_row()`] should only be called after the
returned row goes out of scope.
+ ///
+ /// # Returns
+ /// * the written row
+ fn finish_row(&mut self) -> Result<impl BinaryRow>;
+
+ /// Closes the row encoder
+ ///
+ /// # Returns
+ /// * Ok(()) if successful
+ fn close(&mut self) -> Result<()>;
+}
+
+#[allow(dead_code)]
+pub struct RowEncoderFactory {}
+
+#[allow(dead_code)]
+impl RowEncoderFactory {
+ pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result<impl
RowEncoder> {
+ Self::create_for_field_types(kv_format,
row_type.field_types().cloned().collect())
+ }
+
+ pub fn create_for_field_types(
+ kv_format: KvFormat,
+ field_data_types: Vec<DataType>,
+ ) -> Result<impl RowEncoder> {
+ match kv_format {
+ KvFormat::INDEXED => {
+ todo!()
+ }
+ KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types),
+ }
+ }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 144d64f..4996063 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -19,7 +19,7 @@ mod column;
mod datum;
-mod binary;
+pub mod binary;
pub mod compacted;
mod encode;
mod field_getter;
@@ -27,7 +27,7 @@ mod field_getter;
pub use column::*;
pub use datum::*;
-pub trait BinaryRow {
+pub trait BinaryRow: InternalRow {
/// Returns the binary representation of this row as a byte slice.
fn as_bytes(&self) -> &[u8];
}