This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d74c4a  feat: Introduce UpsertWriter (#169)
1d74c4a is described below

commit 1d74c4a6bbc0e421b1b95b84d998401ca30cea39
Author: Keith Lee <[email protected]>
AuthorDate: Tue Jan 20 13:09:49 2026 +0000

    feat: Introduce UpsertWriter (#169)
---
 crates/fluss/Cargo.toml                            |   1 +
 crates/fluss/src/client/table/log_fetch_buffer.rs  |   6 +-
 crates/fluss/src/client/table/lookup.rs            |   9 +-
 crates/fluss/src/client/table/mod.rs               |  18 +
 crates/fluss/src/client/table/partition_getter.rs  |  56 +++
 crates/fluss/src/client/table/upsert.rs            | 522 +++++++++++++++++++++
 crates/fluss/src/client/table/writer.rs            |  20 +-
 crates/fluss/src/client/write/accumulator.rs       |  39 +-
 crates/fluss/src/client/write/batch.rs             |  20 +-
 crates/fluss/src/client/write/bucket_assigner.rs   |   9 +-
 crates/fluss/src/client/write/mod.rs               |  54 ++-
 crates/fluss/src/client/write/write_format.rs      |   1 +
 crates/fluss/src/client/write/writer_client.rs     |   9 +-
 crates/fluss/src/metadata/table.rs                 |  66 ++-
 crates/fluss/src/record/arrow.rs                   |  62 ++-
 crates/fluss/src/record/kv/kv_record_batch.rs      |  11 +-
 .../fluss/src/record/kv/kv_record_batch_builder.rs |  80 ++--
 .../fluss/src/record/kv/kv_record_read_context.rs  |  17 +-
 crates/fluss/src/row/compacted/compacted_row.rs    |  15 +-
 .../src/row/compacted/compacted_row_writer.rs      |  12 +
 .../fluss/src/row/encode/compacted_row_encoder.rs  |  13 +-
 crates/fluss/src/row/encode/mod.rs                 |  23 +-
 crates/fluss/src/row/mod.rs                        |  27 +-
 23 files changed, 895 insertions(+), 195 deletions(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index c3bdd44..9aeee72 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -33,6 +33,7 @@ integration_tests = []
 [dependencies]
 arrow = { workspace = true }
 arrow-schema = "57.0.0"
+bitvec = "1"
 byteorder = "1.5"
 futures = "0.3"
 clap = { workspace = true }
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index ac44cc1..ca0a253 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -651,14 +651,14 @@ mod tests {
     use crate::compression::{
         ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
     };
-    use crate::metadata::{DataField, DataTypes, TablePath};
+    use crate::metadata::{DataField, DataTypes, RowType, TablePath};
     use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, 
to_arrow_schema};
     use crate::row::GenericRow;
     use std::sync::Arc;
     use std::time::Duration;
 
     fn test_read_context() -> ReadContext {
-        let row_type = DataTypes::row(vec![DataField::new(
+        let row_type = RowType::new(vec![DataField::new(
             "id".to_string(),
             DataTypes::int(),
             None,
@@ -714,7 +714,7 @@ mod tests {
 
     #[test]
     fn default_completed_fetch_reads_records() -> Result<()> {
-        let row_type = DataTypes::row(vec![
+        let row_type = RowType::new(vec![
             DataField::new("id".to_string(), DataTypes::int(), None),
             DataField::new("name".to_string(), DataTypes::string(), None),
         ]);
diff --git a/crates/fluss/src/client/table/lookup.rs 
b/crates/fluss/src/client/table/lookup.rs
index 1d32ebd..cd23503 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -22,7 +22,7 @@ use crate::error::{Error, Result};
 use crate::metadata::{RowType, TableBucket, TableInfo};
 use crate::row::InternalRow;
 use crate::row::compacted::CompactedRow;
-use crate::row::encode::KeyEncoder;
+use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
 use crate::rpc::ApiError;
 use crate::rpc::message::LookupRequest;
 use std::sync::Arc;
@@ -130,8 +130,11 @@ impl<'a> TableLookup<'a> {
 
         // Create key encoder for the primary key fields
         let pk_fields = self.table_info.get_physical_primary_keys().to_vec();
-        let key_encoder =
-            <dyn KeyEncoder>::of(self.table_info.row_type(), pk_fields, 
data_lake_format)?;
+        let key_encoder = KeyEncoderFactory::of(
+            self.table_info.row_type(),
+            pk_fields.as_slice(),
+            &data_lake_format,
+        )?;
 
         Ok(Lookuper {
             conn: self.conn,
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 7356be2..2bfa054 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -27,13 +27,17 @@ mod append;
 mod lookup;
 
 mod log_fetch_buffer;
+mod partition_getter;
 mod remote_log;
 mod scanner;
+mod upsert;
 mod writer;
 
+use crate::client::table::upsert::TableUpsert;
 pub use append::{AppendWriter, TableAppend};
 pub use lookup::{LookupResult, Lookuper, TableLookup};
 pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
+pub use writer::{TableWriter, UpsertWriter};
 
 #[allow(dead_code)]
 pub struct FlussTable<'a> {
@@ -119,6 +123,20 @@ impl<'a> FlussTable<'a> {
             self.metadata.clone(),
         ))
     }
+
+    pub fn new_upsert(&self) -> Result<TableUpsert> {
+        if !self.has_primary_key {
+            return Err(Error::UnsupportedOperation {
+                message: "Upsert is only supported for primary key 
tables".to_string(),
+            });
+        }
+
+        Ok(TableUpsert::new(
+            self.table_path.clone(),
+            self.table_info.clone(),
+            self.conn.get_or_create_writer_client()?,
+        ))
+    }
 }
 
 impl<'a> Drop for FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/partition_getter.rs 
b/crates/fluss/src/client/table/partition_getter.rs
new file mode 100644
index 0000000..4529d86
--- /dev/null
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::{DataType, RowType};
+use crate::row::field_getter::FieldGetter;
+
+#[allow(dead_code)]
+pub struct PartitionGetter<'a> {
+    partitions: Vec<(&'a String, &'a DataType, FieldGetter)>,
+}
+
+#[allow(dead_code)]
+impl<'a> PartitionGetter<'a> {
+    pub fn new(row_type: &'a RowType, partition_keys: &'a Vec<String>) -> 
Result<Self> {
+        let mut partitions = Vec::with_capacity(partition_keys.len());
+
+        for partition_key in partition_keys {
+            if let Some(partition_col_index) = 
row_type.get_field_index(partition_key.as_str()) {
+                let data_type = &row_type
+                    .fields()
+                    .get(partition_col_index)
+                    .unwrap()
+                    .data_type;
+                let field_getter = FieldGetter::create(data_type, 
partition_col_index);
+
+                partitions.push((partition_key, data_type, field_getter));
+            } else {
+                return Err(IllegalArgument {
+                    message: format!(
+                        "The partition column {partition_key} is not in the 
row {row_type}."
+                    ),
+                });
+            };
+        }
+
+        Ok(Self { partitions })
+    }
+
+    // TODO Implement get partition
+}
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
new file mode 100644
index 0000000..a3909e7
--- /dev/null
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, 
UpsertWriter};
+use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::{KvFormat, RowType, TableInfo, TablePath};
+use crate::row::InternalRow;
+use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, 
RowEncoderFactory};
+use crate::row::field_getter::FieldGetter;
+use std::sync::Arc;
+
+use bitvec::prelude::bitvec;
+use bytes::Bytes;
+
+#[allow(dead_code)]
+pub struct TableUpsert {
+    table_path: TablePath,
+    table_info: TableInfo,
+    writer_client: Arc<WriterClient>,
+    target_columns: Option<Arc<Vec<usize>>>,
+}
+
+#[allow(dead_code)]
+impl TableUpsert {
+    pub fn new(
+        table_path: TablePath,
+        table_info: TableInfo,
+        writer_client: Arc<WriterClient>,
+    ) -> Self {
+        Self {
+            table_path,
+            table_info,
+            writer_client,
+            target_columns: None,
+        }
+    }
+
+    pub fn partial_update(&self, target_columns: Option<Vec<usize>>) -> 
Result<Self> {
+        if let Some(columns) = &target_columns {
+            let num_columns = self.table_info.row_type().fields().len();
+
+            if let Some(&invalid_column) = columns.iter().find(|&&col| col >= 
num_columns) {
+                return Err(IllegalArgument {
+                    message: format!(
+                        "Invalid target column index: {invalid_column} for 
table {}. The table only has {num_columns} columns.",
+                        self.table_path
+                    ),
+                });
+            }
+        }
+
+        Ok(Self {
+            table_path: self.table_path.clone(),
+            table_info: self.table_info.clone(),
+            writer_client: self.writer_client.clone(),
+            target_columns: target_columns.map(Arc::new),
+        })
+    }
+
+    pub fn partial_update_with_column_names(&self, target_column_names: 
&[&str]) -> Result<Self> {
+        let row_type = self.table_info.row_type();
+        let col_indices: Vec<(&str, Option<usize>)> = target_column_names
+            .iter()
+            .map(|col_name| (*col_name, row_type.get_field_index(col_name)))
+            .collect();
+
+        if let Some((missing_name, _)) = col_indices.iter().find(|(_, ix)| 
ix.is_none()) {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Cannot find target column `{}` for table {}.",
+                    missing_name, self.table_path
+                ),
+            });
+        }
+
+        let valid_col_indices: Vec<usize> = col_indices
+            .into_iter()
+            .map(|(_, index)| index.unwrap())
+            .collect();
+
+        self.partial_update(Some(valid_col_indices))
+    }
+
+    pub fn create_writer(&self) -> Result<impl UpsertWriter> {
+        UpsertWriterFactory::create(
+            Arc::new(self.table_path.clone()),
+            Arc::new(self.table_info.clone()),
+            self.target_columns.clone(),
+            Arc::clone(&self.writer_client),
+        )
+    }
+}
+
+#[allow(dead_code)]
+struct UpsertWriterImpl<RE>
+where
+    RE: RowEncoder,
+{
+    table_path: Arc<TablePath>,
+    writer_client: Arc<WriterClient>,
+    // TODO: Partitioning
+    // partition_field_getter: Option<Box<dyn KeyEncoder>>,
+    primary_key_encoder: Box<dyn KeyEncoder>,
+    target_columns: Option<Arc<Vec<usize>>>,
+    // Use primary key encoder as bucket key encoder when None
+    bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
+    kv_format: KvFormat,
+    write_format: WriteFormat,
+    row_encoder: RE,
+    field_getters: Box<[FieldGetter]>,
+    table_info: Arc<TableInfo>,
+}
+
+#[allow(dead_code)]
+struct UpsertWriterFactory;
+
+#[allow(dead_code)]
+impl UpsertWriterFactory {
+    pub fn create(
+        table_path: Arc<TablePath>,
+        table_info: Arc<TableInfo>,
+        partial_update_columns: Option<Arc<Vec<usize>>>,
+        writer_client: Arc<WriterClient>,
+    ) -> Result<impl UpsertWriter> {
+        let data_lake_format = &table_info.table_config.get_datalake_format()?;
+        let row_type = table_info.row_type();
+        let physical_pks = table_info.get_physical_primary_keys();
+
+        let names = table_info.get_schema().auto_increment_col_names();
+
+        Self::sanity_check(
+            row_type,
+            &table_info.primary_keys,
+            names,
+            &partial_update_columns,
+        )?;
+
+        let primary_key_encoder = KeyEncoderFactory::of(row_type, 
physical_pks, data_lake_format)?;
+        let bucket_key_encoder = if !table_info.is_default_bucket_key() {
+            Some(KeyEncoderFactory::of(
+                row_type,
+                table_info.get_bucket_keys(),
+                data_lake_format,
+            )?)
+        } else {
+            // Defaults to using primary key encoder when None for bucket key
+            None
+        };
+
+        let kv_format = table_info.get_table_config().get_kv_format()?;
+        let write_format = WriteFormat::from_kv_format(&kv_format)?;
+
+        let field_getters = FieldGetter::create_field_getters(row_type);
+
+        Ok(UpsertWriterImpl {
+            table_path,
+            writer_client,
+            primary_key_encoder,
+            target_columns: partial_update_columns,
+            bucket_key_encoder,
+            kv_format: kv_format.clone(),
+            write_format,
+            row_encoder: RowEncoderFactory::create(kv_format, 
row_type.clone())?,
+            field_getters,
+            table_info: table_info.clone(),
+        })
+    }
+
+    #[allow(dead_code)]
+    fn sanity_check(
+        row_type: &RowType,
+        primary_keys: &Vec<String>,
+        auto_increment_col_names: &Vec<String>,
+        target_columns: &Option<Arc<Vec<usize>>>,
+    ) -> Result<()> {
+        if target_columns.is_none() {
+            if !auto_increment_col_names.is_empty() {
+                return Err(IllegalArgument {
+                    message: format!(
+                        "This table has auto increment column {}. Explicitly 
specifying values for an auto increment column is not allowed. Please Specify 
non-auto-increment columns as target columns using partialUpdate first.",
+                        auto_increment_col_names.join(", ")
+                    ),
+                });
+            }
+            return Ok(());
+        }
+
+        let field_count = row_type.fields().len();
+
+        let mut target_column_set = bitvec![0; field_count];
+
+        let columns = target_columns.as_ref().unwrap().as_ref();
+
+        for &target_index in columns {
+            target_column_set.set(target_index, true);
+        }
+
+        let mut pk_column_set = bitvec![0; field_count];
+
+        // check the target columns contains the primary key
+        for primary_key in primary_keys {
+            let pk_index = row_type.get_field_index(primary_key.as_str());
+            match pk_index {
+                Some(pk_index) => {
+                    if !target_column_set[pk_index] {
+                        return Err(IllegalArgument {
+                            message: format!(
+                                "The target write columns {} must contain the 
primary key columns {}",
+                                
row_type.project(columns)?.get_field_names().join(", "),
+                                primary_keys.join(", ")
+                            ),
+                        });
+                    }
+                    pk_column_set.set(pk_index, true);
+                }
+                None => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "The specified primary key {} is not in row type 
{}",
+                            primary_key, row_type
+                        ),
+                    });
+                }
+            }
+        }
+
+        let mut auto_increment_column_set = bitvec![0; field_count];
+        // explicitly specifying values for an auto increment column is not 
allowed
+        for auto_increment_col_name in auto_increment_col_names {
+            let auto_increment_field_index =
+                row_type.get_field_index(auto_increment_col_name.as_str());
+
+            if let Some(index) = auto_increment_field_index {
+                if target_column_set[index] {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Explicitly specifying values for the auto 
increment column {} is not allowed.",
+                            auto_increment_col_name
+                        ),
+                    });
+                }
+
+                auto_increment_column_set.set(index, true);
+            }
+        }
+
+        // check the columns not in targetColumns should be nullable
+        for i in 0..field_count {
+            // column not in primary key and not in auto increment column
+            if !pk_column_set[i] && !auto_increment_column_set[i] {
+                // the column should be nullable
+                if !row_type.fields().get(i).unwrap().data_type.is_nullable() {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Partial Update requires all columns except 
primary key to be nullable, but column {} is NOT NULL.",
+                            row_type.fields().get(i).unwrap().name()
+                        ),
+                    });
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
+#[allow(dead_code)]
+impl<RE: RowEncoder> UpsertWriterImpl<RE> {
+    fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
+        let expected = self.table_info.get_row_type().fields().len();
+        if row.get_field_count() != expected {
+            return Err(IllegalArgument {
+                message: format!(
+                    "The field count of the row does not match the table 
schema. Expected: {}, Actual: {}",
+                    expected,
+                    row.get_field_count()
+                ),
+            });
+        }
+        Ok(())
+    }
+
+    fn get_keys(&mut self, row: &dyn InternalRow) -> Result<(Bytes, 
Option<Bytes>)> {
+        let key = self.primary_key_encoder.encode_key(row)?;
+        let bucket_key = match &mut self.bucket_key_encoder {
+            Some(bucket_key_encoder) => 
Some(bucket_key_encoder.encode_key(row)?),
+            None => Some(key.clone()),
+        };
+        Ok((key, bucket_key))
+    }
+
+    fn encode_row<R: InternalRow>(&mut self, row: &R) -> Result<Bytes> {
+        self.row_encoder.start_new_row()?;
+        for (pos, field_getter) in self.field_getters.iter().enumerate() {
+            let datum = field_getter.get_field(row);
+            self.row_encoder.encode_field(pos, datum)?;
+        }
+        self.row_encoder.finish_row()
+    }
+}
+
+impl<RE: RowEncoder> TableWriter for UpsertWriterImpl<RE> {
+    /// Flush data written that have not yet been sent to the server, forcing 
the client to send the
+    /// requests to server and blocks on the completion of the requests 
associated with these
+    /// records. A request is considered completed when it is successfully 
acknowledged according to
+    /// the CLIENT_WRITER_ACKS configuration option you have specified or else 
it
+    /// results in an error.
+    async fn flush(&self) -> Result<()> {
+        self.writer_client.flush().await
+    }
+}
+
+impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
+    /// Inserts row into Fluss table if they do not already exist, or updates 
them if they do exist.
+    ///
+    /// # Arguments
+    /// * row - the row to upsert.
+    ///
+    /// # Returns
+    /// Ok(UpsertResult) when completed normally
+    async fn upsert<R: InternalRow>(&mut self, row: &R) -> 
Result<UpsertResult> {
+        self.check_field_count(row)?;
+
+        let (key, bucket_key) = self.get_keys(row)?;
+
+        let row_bytes: RowBytes<'_> = match 
row.as_encoded_bytes(self.write_format) {
+            Some(bytes) => RowBytes::Borrowed(bytes),
+            None => RowBytes::Owned(self.encode_row(row)?),
+        };
+
+        let write_record = WriteRecord::for_upsert(
+            Arc::clone(&self.table_path),
+            self.table_info.schema_id,
+            key,
+            bucket_key,
+            self.write_format,
+            self.target_columns.clone(),
+            Some(row_bytes),
+        );
+
+        let result_handle = self.writer_client.send(&write_record).await?;
+        let result = result_handle.wait().await?;
+
+        result_handle.result(result).map(|_| UpsertResult)
+    }
+
+    /// Delete certain row by the input row in Fluss table, the input row must 
contain the primary
+    /// key.
+    ///
+    /// # Arguments
+    /// * row - the row to delete.
+    ///
+    /// # Returns
+    /// Ok(DeleteResult) when completed normally
+    async fn delete<R: InternalRow>(&mut self, row: &R) -> 
Result<DeleteResult> {
+        self.check_field_count(row)?;
+
+        let (key, bucket_key) = self.get_keys(row)?;
+
+        let write_record = WriteRecord::for_upsert(
+            Arc::clone(&self.table_path),
+            self.table_info.schema_id,
+            key,
+            bucket_key,
+            self.write_format,
+            self.target_columns.clone(),
+            None,
+        );
+
+        let result_handle = self.writer_client.send(&write_record).await?;
+        let result = result_handle.wait().await?;
+
+        result_handle.result(result).map(|_| DeleteResult)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::{DataField, DataTypes};
+
+    #[test]
+    fn sanity_check() {
+        // No target columns specified but table has auto-increment column
+        let fields = vec![
+            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ];
+        let row_type = RowType::new(fields);
+        let primary_keys = vec!["id".to_string()];
+        let auto_increment_col_names = vec!["id".to_string()];
+        let target_columns = None;
+
+        let result = UpsertWriterFactory::sanity_check(
+            &row_type,
+            &primary_keys,
+            &auto_increment_col_names,
+            &target_columns,
+        );
+
+        assert!(result.unwrap_err().to_string().contains(
+            "This table has auto increment column id. Explicitly specifying 
values for an auto increment column is not allowed. Please Specify 
non-auto-increment columns as target columns using partialUpdate first."
+        ));
+
+        // Target columns do not contain primary key
+        let fields = vec![
+            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+            DataField::new("value".to_string(), DataTypes::int(), None),
+        ];
+        let row_type = RowType::new(fields);
+        let primary_keys = vec!["id".to_string()];
+        let auto_increment_col_names = vec![];
+        let target_columns = Some(Arc::new(vec![1usize]));
+
+        let result = UpsertWriterFactory::sanity_check(
+            &row_type,
+            &primary_keys,
+            &auto_increment_col_names,
+            &target_columns,
+        );
+
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("The target write columns name must contain the 
primary key columns id")
+        );
+
+        // Primary key column not found in row type
+        let fields = vec![
+            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ];
+        let row_type = RowType::new(fields);
+        let primary_keys = vec!["nonexistent_pk".to_string()];
+        let auto_increment_col_names = vec![];
+        let target_columns = Some(Arc::new(vec![0usize, 1]));
+
+        let result = UpsertWriterFactory::sanity_check(
+            &row_type,
+            &primary_keys,
+            &auto_increment_col_names,
+            &target_columns,
+        );
+
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("The specified primary key nonexistent_pk is not in 
row type")
+        );
+
+        // Target columns include auto-increment column
+        let fields = vec![
+            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
+            DataField::new(
+                "seq".to_string(),
+                DataTypes::bigint().as_non_nullable(),
+                None,
+            ),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ];
+        let row_type = RowType::new(fields);
+        let primary_keys = vec!["id".to_string()];
+        let auto_increment_col_names = vec!["seq".to_string()];
+        let target_columns = Some(Arc::new(vec![0usize, 1, 2]));
+
+        let result = UpsertWriterFactory::sanity_check(
+            &row_type,
+            &primary_keys,
+            &auto_increment_col_names,
+            &target_columns,
+        );
+
+        assert!(result.unwrap_err().to_string().contains(
+            "Explicitly specifying values for the auto increment column seq is 
not allowed."
+        ));
+
+        // Non-nullable column not in target columns (partial update requires 
nullable)
+        let fields = vec![
+            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
+            DataField::new(
+                "required_field".to_string(),
+                DataTypes::string().as_non_nullable(),
+                None,
+            ),
+            DataField::new("optional_field".to_string(), DataTypes::int(), 
None),
+        ];
+        let row_type = RowType::new(fields);
+        let primary_keys = vec!["id".to_string()];
+        let auto_increment_col_names = vec![];
+        let target_columns = Some(Arc::new(vec![0usize]));
+
+        let result = UpsertWriterFactory::sanity_check(
+            &row_type,
+            &primary_keys,
+            &auto_increment_col_names,
+            &target_columns,
+        );
+
+        assert!(result.unwrap_err().to_string().contains(
+            "Partial Update requires all columns except primary key to be 
nullable, but column required_field is NOT NULL."
+        ));
+    }
+}
diff --git a/crates/fluss/src/client/table/writer.rs 
b/crates/fluss/src/client/table/writer.rs
index 8a83b5e..8276545 100644
--- a/crates/fluss/src/client/table/writer.rs
+++ b/crates/fluss/src/client/table/writer.rs
@@ -16,13 +16,13 @@
 // under the License.
 
 use crate::client::{WriteRecord, WriterClient};
-use crate::row::GenericRow;
+use crate::row::{GenericRow, InternalRow};
 use std::sync::Arc;
 
 use crate::error::Result;
 use crate::metadata::{TableInfo, TablePath};
 
-#[allow(dead_code)]
+#[allow(dead_code, async_fn_in_trait)]
 pub trait TableWriter {
     async fn flush(&self) -> Result<()>;
 }
@@ -32,12 +32,22 @@ pub trait AppendWriter: TableWriter {
     async fn append(&self, row: GenericRow) -> Result<()>;
 }
 
-#[allow(dead_code)]
+#[allow(dead_code, async_fn_in_trait)]
 pub trait UpsertWriter: TableWriter {
-    async fn upsert(&self, row: GenericRow) -> Result<()>;
-    async fn delete(&self, row: GenericRow) -> Result<()>;
+    async fn upsert<R: InternalRow>(&mut self, row: &R) -> 
Result<UpsertResult>;
+    async fn delete<R: InternalRow>(&mut self, row: &R) -> 
Result<DeleteResult>;
 }
 
+/// The result of upserting a record
+/// Currently this is an empty struct to allow for compatible evolution in the 
future
+#[derive(Default)]
+pub struct UpsertResult;
+
+/// The result of deleting a record
+/// Currently this is an empty struct to allow for compatible evolution in the 
future
+#[derive(Default)]
+pub struct DeleteResult;
+
 #[allow(dead_code)]
 pub struct AbstractTableWriter {
     table_path: Arc<TablePath>,
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 0afc9d4..fb7b544 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::write::batch::WriteBatch::ArrowLog;
-use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
+use crate::client::write::batch::WriteBatch::{ArrowLog, Kv};
+use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, 
WriteBatch};
 use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
 use crate::cluster::{BucketLocation, Cluster, ServerNode};
 use crate::config::Config;
@@ -102,16 +102,29 @@ impl RecordAccumulator {
 
         let schema_id = table_info.schema_id;
 
-        let mut batch = ArrowLog(ArrowLogWriteBatch::new(
-            self.batch_id.fetch_add(1, Ordering::Relaxed),
-            table_path.as_ref().clone(),
-            schema_id,
-            arrow_compression_info,
-            row_type,
-            bucket_id,
-            current_time_ms(),
-            matches!(&record.record, 
Record::Log(LogWriteRecord::RecordBatch(_))),
-        ));
+        let mut batch: WriteBatch = match record.record() {
+            Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new(
+                self.batch_id.fetch_add(1, Ordering::Relaxed),
+                table_path.as_ref().clone(),
+                schema_id,
+                arrow_compression_info,
+                row_type,
+                bucket_id,
+                current_time_ms(),
+                matches!(&record.record, 
Record::Log(LogWriteRecord::RecordBatch(_))),
+            )),
+            Record::Kv(kv_record) => Kv(KvWriteBatch::new(
+                self.batch_id.fetch_add(1, Ordering::Relaxed),
+                table_path.as_ref().clone(),
+                schema_id,
+                // TODO: Decide how to derive write limit in the absence of 
java's equivalent of PreAllocatedPagedOutputView
+                KvWriteBatch::DEFAULT_WRITE_LIMIT,
+                record.write_format.to_kv_format()?,
+                bucket_id,
+                kv_record.target_columns.clone(),
+                current_time_ms(),
+            )),
+        };
 
         let batch_id = batch.batch_id();
 
@@ -142,6 +155,8 @@ impl RecordAccumulator {
     ) -> Result<RecordAppendResult> {
         let table_path = &record.table_path;
 
+        // TODO: Implement partitioning
+
         let dq = {
             let mut binding = self
                 .write_batches
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 0159753..2ddf519 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -20,11 +20,12 @@ use crate::client::broadcast::{BatchWriteResult, 
BroadcastOnce};
 use crate::client::{Record, ResultHandle, WriteRecord};
 use crate::compression::ArrowCompressionInfo;
 use crate::error::{Error, Result};
-use crate::metadata::{DataType, KvFormat, TablePath};
+use crate::metadata::{KvFormat, RowType, TablePath};
 use crate::record::MemoryLogRecordsArrowBuilder;
 use crate::record::kv::KvRecordBatchBuilder;
 use bytes::Bytes;
 use std::cmp::max;
+use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 
 #[allow(dead_code)]
@@ -192,7 +193,7 @@ impl ArrowLogWriteBatch {
         table_path: TablePath,
         schema_id: i32,
         arrow_compression_info: ArrowCompressionInfo,
-        row_type: &DataType,
+        row_type: &RowType,
         bucket_id: BucketId,
         create_ms: i64,
         to_append_record_batch: bool,
@@ -249,11 +250,12 @@ impl ArrowLogWriteBatch {
 pub struct KvWriteBatch {
     write_batch: InnerWriteBatch,
     kv_batch_builder: KvRecordBatchBuilder,
-    target_columns: Option<Vec<usize>>,
+    target_columns: Option<Arc<Vec<usize>>>,
     schema_id: i32,
 }
 
 impl KvWriteBatch {
+    pub const DEFAULT_WRITE_LIMIT: usize = 256;
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         batch_id: i64,
@@ -262,7 +264,7 @@ impl KvWriteBatch {
         write_limit: usize,
         kv_format: KvFormat,
         bucket_id: BucketId,
-        target_columns: Option<Vec<usize>>,
+        target_columns: Option<Arc<Vec<usize>>>,
         create_ms: i64,
     ) -> Self {
         let base = InnerWriteBatch::new(batch_id, table_path, create_ms, 
bucket_id);
@@ -284,7 +286,7 @@ impl KvWriteBatch {
             }
         };
 
-        let key = kv_write_record.key;
+        let key = kv_write_record.key.as_ref();
 
         if self.schema_id != write_record.schema_id {
             return Err(Error::UnexpectedError {
@@ -296,7 +298,7 @@ impl KvWriteBatch {
             });
         };
 
-        if self.target_columns.as_deref() != kv_write_record.target_columns {
+        if self.target_columns != kv_write_record.target_columns {
             return Err(Error::UnexpectedError {
                 message: format!(
                     "target columns {:?} of the write record to append are not 
the same as the current target columns {:?} in the batch.",
@@ -307,14 +309,14 @@ impl KvWriteBatch {
             });
         }
 
-        let row = kv_write_record.compacted_row.as_ref();
+        let row_bytes = kv_write_record.row_bytes();
 
-        if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, 
row) {
+        if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key, 
row_bytes) {
             Ok(None)
         } else {
             // append successfully
             self.kv_batch_builder
-                .append_row(key, row)
+                .append_row(key, row_bytes)
                 .map_err(|e| Error::UnexpectedError {
                     message: "Failed to append row to 
KvWriteBatch".to_string(),
                     source: Some(Box::new(e)),
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs 
b/crates/fluss/src/client/write/bucket_assigner.rs
index 2370719..817101a 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -20,6 +20,7 @@ use crate::cluster::Cluster;
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
 use crate::metadata::TablePath;
+use bytes::Bytes;
 use rand::Rng;
 use std::sync::atomic::{AtomicI32, Ordering};
 
@@ -28,7 +29,7 @@ pub trait BucketAssigner: Sync + Send {
 
     fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32);
 
-    fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> 
Result<i32>;
+    fn assign_bucket(&self, bucket_key: Option<&Bytes>, cluster: &Cluster) -> 
Result<i32>;
 }
 
 #[derive(Debug)]
@@ -94,7 +95,7 @@ impl BucketAssigner for StickyBucketAssigner {
         self.next_bucket(cluster, prev_bucket_id);
     }
 
-    fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> 
Result<i32> {
+    fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) -> 
Result<i32> {
         let bucket_id = self.current_bucket_id.load(Ordering::Relaxed);
         if bucket_id < 0 {
             Ok(self.next_bucket(cluster, bucket_id))
@@ -139,7 +140,7 @@ impl BucketAssigner for HashBucketAssigner {
         // do nothing
     }
 
-    fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> 
Result<i32> {
+    fn assign_bucket(&self, bucket_key: Option<&Bytes>, _: &Cluster) -> 
Result<i32> {
         let key = bucket_key.ok_or_else(|| IllegalArgument {
             message: "no bucket key provided".to_string(),
         })?;
@@ -181,7 +182,7 @@ mod tests {
         let assigner = HashBucketAssigner::new(4, <dyn 
BucketingFunction>::of(None));
         let cluster = Cluster::default();
         let bucket = assigner
-            .assign_bucket(Some(b"key"), &cluster)
+            .assign_bucket(Some(&Bytes::from_static(b"key")), &cluster)
             .expect("bucket");
         assert!((0..4).contains(&bucket));
     }
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index 248218e..dcc6795 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -21,9 +21,10 @@ mod batch;
 use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, 
BroadcastOnceReceiver};
 use crate::error::Error;
 use crate::metadata::TablePath;
-use crate::row::{CompactedRow, GenericRow};
+use crate::row::GenericRow;
 pub use accumulator::*;
 use arrow::array::RecordBatch;
+use bytes::Bytes;
 use std::sync::Arc;
 
 pub(crate) mod broadcast;
@@ -40,7 +41,7 @@ pub use writer_client::WriterClient;
 pub struct WriteRecord<'a> {
     record: Record<'a>,
     table_path: Arc<TablePath>,
-    bucket_key: Option<&'a [u8]>,
+    bucket_key: Option<Bytes>,
     schema_id: i32,
     write_format: WriteFormat,
 }
@@ -61,25 +62,43 @@ pub enum LogWriteRecord<'a> {
     RecordBatch(Arc<RecordBatch>),
 }
 
+#[derive(Clone)]
+pub enum RowBytes<'a> {
+    Borrowed(&'a [u8]),
+    Owned(Bytes),
+}
+
+impl<'a> RowBytes<'a> {
+    pub fn as_slice(&self) -> &[u8] {
+        match self {
+            RowBytes::Borrowed(slice) => slice,
+            RowBytes::Owned(bytes) => bytes.as_ref(),
+        }
+    }
+}
+
 pub struct KvWriteRecord<'a> {
-    // only valid for primary key table
-    key: &'a [u8],
-    target_columns: Option<&'a [usize]>,
-    compacted_row: Option<CompactedRow<'a>>,
+    key: Bytes,
+    target_columns: Option<Arc<Vec<usize>>>,
+    row_bytes: Option<RowBytes<'a>>,
 }
 
 impl<'a> KvWriteRecord<'a> {
     fn new(
-        key: &'a [u8],
-        target_columns: Option<&'a [usize]>,
-        compacted_row: Option<CompactedRow<'a>>,
+        key: Bytes,
+        target_columns: Option<Arc<Vec<usize>>>,
+        row_bytes: Option<RowBytes<'a>>,
     ) -> Self {
         KvWriteRecord {
             key,
             target_columns,
-            compacted_row,
+            row_bytes,
         }
     }
+
+    pub fn row_bytes(&self) -> Option<&[u8]> {
+        self.row_bytes.as_ref().map(|rb| rb.as_slice())
+    }
 }
 
 impl<'a> WriteRecord<'a> {
@@ -110,17 +129,18 @@ impl<'a> WriteRecord<'a> {
     pub fn for_upsert(
         table_path: Arc<TablePath>,
         schema_id: i32,
-        bucket_key: &'a [u8],
-        key: &'a [u8],
-        target_columns: Option<&'a [usize]>,
-        row: CompactedRow<'a>,
+        key: Bytes,
+        bucket_key: Option<Bytes>,
+        write_format: WriteFormat,
+        target_columns: Option<Arc<Vec<usize>>>,
+        row_bytes: Option<RowBytes<'a>>,
     ) -> Self {
         Self {
-            record: Record::Kv(KvWriteRecord::new(key, target_columns, 
Some(row))),
+            record: Record::Kv(KvWriteRecord::new(key, target_columns, 
row_bytes)),
             table_path,
-            bucket_key: Some(bucket_key),
+            bucket_key,
             schema_id,
-            write_format: WriteFormat::CompactedKv,
+            write_format,
         }
     }
 }
diff --git a/crates/fluss/src/client/write/write_format.rs 
b/crates/fluss/src/client/write/write_format.rs
index 4a0c0d8..147152c 100644
--- a/crates/fluss/src/client/write/write_format.rs
+++ b/crates/fluss/src/client/write/write_format.rs
@@ -20,6 +20,7 @@ use crate::error::Result;
 use crate::metadata::KvFormat;
 use std::fmt::Display;
 
+#[derive(Copy, Clone)]
 pub enum WriteFormat {
     ArrowLog,
     CompactedLog,
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index 22e0397..65b04f5 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -21,6 +21,7 @@ use crate::client::write::sender::Sender;
 use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
 use crate::config::Config;
 use crate::metadata::TablePath;
+use bytes::Bytes;
 use dashmap::DashMap;
 use std::sync::Arc;
 use tokio::sync::mpsc;
@@ -90,8 +91,9 @@ impl WriterClient {
     pub async fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle> 
{
         let table_path = &record.table_path;
         let cluster = self.metadata.get_cluster();
+        let bucket_key = record.bucket_key.as_ref();
 
-        let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?;
+        let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key, 
table_path)?;
 
         let mut result = self
             .accumulate
@@ -101,7 +103,7 @@ impl WriterClient {
         if result.abort_record_for_new_batch {
             let prev_bucket_id = bucket_id;
             bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
-            let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
+            let bucket_id = bucket_assigner.assign_bucket(bucket_key, 
&cluster)?;
             result = self
                 .accumulate
                 .append(record, bucket_id, &cluster, false)
@@ -116,6 +118,7 @@ impl WriterClient {
     }
     fn assign_bucket(
         &self,
+        bucket_key: Option<&Bytes>,
         table_path: &Arc<TablePath>,
     ) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
         let cluster = self.metadata.get_cluster();
@@ -129,7 +132,7 @@ impl WriterClient {
                 assigner
             }
         };
-        let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
+        let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?;
         Ok((bucket_assigner, bucket_id))
     }
 
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index da85b0c..8204e7c 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::compression::ArrowCompressionInfo;
-use crate::error::Error::InvalidTableError;
+use crate::error::Error::{IllegalArgument, InvalidTableError};
 use crate::error::{Error, Result};
 use crate::metadata::DataLakeFormat;
 use crate::metadata::datatype::{DataField, DataType, RowType};
@@ -97,8 +97,8 @@ impl PrimaryKey {
 pub struct Schema {
     columns: Vec<Column>,
     primary_key: Option<PrimaryKey>,
-    // must be Row data type kind
-    row_type: DataType,
+    row_type: RowType,
+    auto_increment_col_names: Vec<String>,
 }
 
 impl Schema {
@@ -118,7 +118,7 @@ impl Schema {
         self.primary_key.as_ref()
     }
 
-    pub fn row_type(&self) -> &DataType {
+    pub fn row_type(&self) -> &RowType {
         &self.row_type
     }
 
@@ -144,12 +144,17 @@ impl Schema {
     pub fn column_names(&self) -> Vec<&str> {
         self.columns.iter().map(|c| c.name.as_str()).collect()
     }
+
+    pub fn auto_increment_col_names(&self) -> &Vec<String> {
+        &self.auto_increment_col_names
+    }
 }
 
 #[derive(Debug, Default)]
 pub struct SchemaBuilder {
     columns: Vec<Column>,
     primary_key: Option<PrimaryKey>,
+    auto_increment_col_names: Vec<String>,
 }
 
 impl SchemaBuilder {
@@ -198,9 +203,36 @@ impl SchemaBuilder {
         self
     }
 
+    /// Declares a column to be auto-incremented. With an auto-increment 
column in the table,
+    /// whenever a new row is inserted into the table, the new row will be 
assigned with the next
+    /// available value from the auto-increment sequence. A table can have at 
most one auto
+    /// increment column.
+    pub fn enable_auto_increment(mut self, column_name: &str) -> Result<Self> {
+        if !self.auto_increment_col_names.is_empty() {
+            return Err(IllegalArgument {
+                message: "Multiple auto increment columns are not supported 
yet.".to_string(),
+            });
+        }
+
+        self.auto_increment_col_names.push(column_name.to_string());
+        Ok(self)
+    }
+
     pub fn build(&mut self) -> Result<Schema> {
         let columns = Self::normalize_columns(&mut self.columns, 
self.primary_key.as_ref())?;
 
+        let column_names: HashSet<_> = columns.iter().map(|c| 
&c.name).collect();
+        for auto_inc_col in &self.auto_increment_col_names {
+            if !column_names.contains(auto_inc_col) {
+                return Err(IllegalArgument {
+                    message: format!(
+                        "Auto increment column '{}' is not found in the schema 
columns.",
+                        auto_inc_col
+                    ),
+                });
+            }
+        }
+
         let data_fields = columns
             .iter()
             .map(|c| DataField {
@@ -213,7 +245,8 @@ impl SchemaBuilder {
         Ok(Schema {
             columns,
             primary_key: self.primary_key.clone(),
-            row_type: DataType::Row(RowType::new(data_fields)),
+            row_type: RowType::new(data_fields),
+            auto_increment_col_names: self.auto_increment_col_names.clone(),
         })
     }
 
@@ -500,7 +533,7 @@ impl TableDescriptor {
         bucket_keys.retain(|k| !partition_keys.contains(k));
 
         if bucket_keys.is_empty() {
-            return Err(Error::InvalidTableError {
+            return Err(InvalidTableError {
                 message: format!(
                     "Primary Key constraint {:?} should not be same with 
partition fields {:?}.",
                     schema.primary_key().unwrap().column_names(),
@@ -580,7 +613,7 @@ pub enum LogFormat {
 }
 
 impl Display for LogFormat {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         match self {
             LogFormat::ARROW => {
                 write!(f, "ARROW")?;
@@ -612,7 +645,7 @@ pub enum KvFormat {
 }
 
 impl Display for KvFormat {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         match self {
             KvFormat::COMPACTED => write!(f, "COMPACTED")?,
             KvFormat::INDEXED => write!(f, "INDEXED")?,
@@ -626,7 +659,7 @@ impl KvFormat {
         match s.to_uppercase().as_str() {
             "INDEXED" => Ok(KvFormat::INDEXED),
             "COMPACTED" => Ok(KvFormat::COMPACTED),
-            _ => Err(Error::InvalidTableError {
+            _ => Err(InvalidTableError {
                 message: format!("Unknown kv format: {s}"),
             }),
         }
@@ -692,7 +725,7 @@ pub struct TableInfo {
     pub table_id: i64,
     pub schema_id: i32,
     pub schema: Schema,
-    pub row_type: DataType,
+    pub row_type: RowType,
     pub primary_keys: Vec<String>,
     pub physical_primary_keys: Vec<String>,
     pub bucket_keys: Vec<String>,
@@ -708,10 +741,7 @@ pub struct TableInfo {
 
 impl TableInfo {
     pub fn row_type(&self) -> &RowType {
-        match &self.row_type {
-            DataType::Row(row_type) => row_type,
-            _ => panic!("should be a row type"),
-        }
+        &self.row_type
     }
 }
 
@@ -847,7 +877,7 @@ impl TableInfo {
         &self.schema
     }
 
-    pub fn get_row_type(&self) -> &DataType {
+    pub fn get_row_type(&self) -> &RowType {
         &self.row_type
     }
 
@@ -946,8 +976,8 @@ impl TableInfo {
     }
 }
 
-impl fmt::Display for TableInfo {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+impl Display for TableInfo {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         write!(
             f,
             "TableInfo{{ table_path={:?}, table_id={}, schema_id={}, 
schema={:?}, physical_primary_keys={:?}, bucket_keys={:?}, partition_keys={:?}, 
num_buckets={}, properties={:?}, custom_properties={:?}, comment={:?}, 
created_time={}, modified_time={} }}",
@@ -998,7 +1028,7 @@ impl TableBucket {
 }
 
 impl Display for TableBucket {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         if let Some(partition_id) = self.partition_id {
             write!(
                 f,
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 3c46f9b..3c94b72 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -18,7 +18,7 @@
 use crate::client::{LogWriteRecord, Record, WriteRecord};
 use crate::compression::ArrowCompressionInfo;
 use crate::error::{Error, Result};
-use crate::metadata::DataType;
+use crate::metadata::{DataType, RowType};
 use crate::record::{ChangeType, ScanRecord};
 use crate::row::{ColumnarRow, GenericRow};
 use arrow::array::{
@@ -48,6 +48,7 @@ use std::{
     sync::Arc,
 };
 
+use crate::error::Error::IllegalArgument;
 use arrow::ipc::writer::IpcWriteOptions;
 /// const for record batch
 pub const BASE_OFFSET_LENGTH: usize = 8;
@@ -171,7 +172,7 @@ pub struct RowAppendRecordBatchBuilder {
 }
 
 impl RowAppendRecordBatchBuilder {
-    pub fn new(row_type: &DataType) -> Self {
+    pub fn new(row_type: &RowType) -> Self {
         let schema_ref = to_arrow_schema(row_type);
         let builders = Mutex::new(
             schema_ref
@@ -251,7 +252,7 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
 impl MemoryLogRecordsArrowBuilder {
     pub fn new(
         schema_id: i32,
-        row_type: &DataType,
+        row_type: &RowType,
         to_append_record_batch: bool,
         arrow_compression_info: ArrowCompressionInfo,
     ) -> Self {
@@ -329,7 +330,7 @@ impl MemoryLogRecordsArrowBuilder {
         // write arrow batch bytes
         let mut cursor = Cursor::new(&mut batch_bytes[..]);
         cursor.set_position(RECORD_BATCH_HEADER_SIZE as u64);
-        cursor.write_all(real_arrow_batch_bytes).unwrap();
+        cursor.write_all(real_arrow_batch_bytes)?;
 
         let calcute_crc_bytes = &cursor.get_ref()[SCHEMA_ID_OFFSET..];
         // then update crc
@@ -562,16 +563,17 @@ impl LogRecordBatch {
             return 
Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
         }
 
-        let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| {
-            crate::error::Error::UnexpectedError {
+        let data = self
+            .data
+            .get(RECORDS_OFFSET..)
+            .ok_or_else(|| Error::UnexpectedError {
                 message: format!(
                     "Corrupt log record batch: data length {} is less than 
RECORDS_OFFSET {}",
                     self.data.len(),
                     RECORDS_OFFSET
                 ),
                 source: None,
-            }
-        })?;
+            })?;
         read_context.record_batch(data)
     }
 }
@@ -639,27 +641,20 @@ fn parse_ipc_message(
     Ok((batch_metadata, body_buffer, message.version()))
 }
 
-pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef {
-    match &fluss_schema {
-        DataType::Row(row_type) => {
-            let fields: Vec<Field> = row_type
-                .fields()
-                .iter()
-                .map(|f| {
-                    Field::new(
-                        f.name(),
-                        to_arrow_type(f.data_type()),
-                        f.data_type().is_nullable(),
-                    )
-                })
-                .collect();
+pub fn to_arrow_schema(fluss_schema: &RowType) -> SchemaRef {
+    let fields: Vec<Field> = fluss_schema
+        .fields()
+        .iter()
+        .map(|f| {
+            Field::new(
+                f.name(),
+                to_arrow_type(f.data_type()),
+                f.data_type().is_nullable(),
+            )
+        })
+        .collect();
 
-            SchemaRef::new(arrow_schema::Schema::new(fields))
-        }
-        _ => {
-            panic!("must be row data type.")
-        }
-    }
+    SchemaRef::new(arrow_schema::Schema::new(fields))
 }
 
 pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType {
@@ -813,7 +808,7 @@ impl ReadContext {
                 let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
                 for &original_idx in &projected_fields {
                     let pos = 
sorted_fields.binary_search(&original_idx).map_err(|_| {
-                        Error::IllegalArgument {
+                        IllegalArgument {
                             message: format!(
                                 "Projection index {original_idx} is invalid 
for the current schema."
                             ),
@@ -857,7 +852,7 @@ impl ReadContext {
         let field_count = schema.fields().len();
         for &index in projected_fields {
             if index >= field_count {
-                return Err(Error::IllegalArgument {
+                return Err(IllegalArgument {
                     message: format!(
                         "Projection index {index} is out of bounds for schema 
with {field_count} fields."
                     ),
@@ -869,7 +864,7 @@ impl ReadContext {
 
     pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
Result<SchemaRef> {
         Ok(SchemaRef::new(schema.project(projected_fields).map_err(
-            |e| Error::IllegalArgument {
+            |e| IllegalArgument {
                 message: format!("Invalid projection: {e}"),
             },
         )?))
@@ -1060,7 +1055,6 @@ pub struct MyVec<T>(pub StreamReader<T>);
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::error::Error;
     use crate::metadata::{DataField, DataTypes};
 
     #[test]
@@ -1217,14 +1211,14 @@ mod tests {
 
     #[test]
     fn projection_rejects_out_of_bounds_index() {
-        let row_type = DataTypes::row(vec![
+        let row_type = RowType::new(vec![
             DataField::new("id".to_string(), DataTypes::int(), None),
             DataField::new("name".to_string(), DataTypes::string(), None),
         ]);
         let schema = to_arrow_schema(&row_type);
         let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], 
false);
 
-        assert!(matches!(result, Err(Error::IllegalArgument { .. })));
+        assert!(matches!(result, Err(IllegalArgument { .. })));
     }
 
     #[test]
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs 
b/crates/fluss/src/record/kv/kv_record_batch.rs
index 32f712f..eb89d69 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -370,12 +370,12 @@ impl Iterator for KvRecordIterator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::metadata::{DataTypes, KvFormat, RowType};
+    use crate::metadata::{DataTypes, KvFormat};
     use crate::record::kv::test_util::TestReadContext;
     use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
     use crate::row::InternalRow;
     use crate::row::binary::BinaryWriter;
-    use crate::row::compacted::CompactedRow;
+
     use bytes::{BufMut, BytesMut};
 
     #[test]
@@ -417,12 +417,11 @@ mod tests {
         let mut value1_writer = CompactedRowWriter::new(1);
         value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
 
-        let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec());
-        let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer());
-        builder.append_row(key1, Some(row)).unwrap();
+        let row_bytes = value1_writer.buffer();
+        builder.append_row(key1, Some(row_bytes)).unwrap();
 
         let key2 = b"key2";
-        builder.append_row::<CompactedRow>(key2, None).unwrap();
+        builder.append_row(key2, None).unwrap();
 
         let bytes = builder.build().unwrap();
 
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index e3da864..0b65500 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -28,7 +28,6 @@ use crate::record::kv::kv_record_batch::{
     WRITE_CLIENT_ID_OFFSET,
 };
 use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
-use crate::row::BinaryRow;
 use bytes::{Bytes, BytesMut};
 use std::io;
 
@@ -88,14 +87,13 @@ impl KvRecordBatchBuilder {
         }
     }
 
-    /// Check if there is room for a new record containing the given key and 
row.
+    /// Check if there is room for a new record containing the given key and 
row bytes.
     /// If no records have been appended, this always returns true.
-    pub fn has_room_for_row<R: BinaryRow>(&self, key: &[u8], row: Option<&R>) 
-> bool {
-        let value = row.map(|r| r.as_bytes());
-        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    pub fn has_room_for_row(&self, key: &[u8], row_bytes: Option<&[u8]>) -> 
bool {
+        self.size_in_bytes + KvRecord::size_of(key, row_bytes) <= 
self.write_limit
     }
 
-    /// Append a KV record with a row value to the batch.
+    /// Append a KV record with row bytes to the batch.
     ///
     /// Returns an error if:
     /// - The builder has been aborted
@@ -103,7 +101,7 @@ impl KvRecordBatchBuilder {
     /// - Adding this record would exceed the write limit
     /// - The maximum number of records is exceeded
     /// - The KV format is not COMPACTED
-    pub fn append_row<R: BinaryRow>(&mut self, key: &[u8], row: Option<&R>) -> 
io::Result<()> {
+    pub fn append_row(&mut self, key: &[u8], row_bytes: Option<&[u8]>) -> 
io::Result<()> {
         if self.kv_format != KvFormat::COMPACTED {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidInput,
@@ -134,8 +132,7 @@ impl KvRecordBatchBuilder {
             ));
         }
 
-        let value = row.map(|r| r.as_bytes());
-        let record_size = KvRecord::size_of(key, value);
+        let record_size = KvRecord::size_of(key, row_bytes);
         if self.size_in_bytes + record_size > self.write_limit {
             return Err(io::Error::new(
                 io::ErrorKind::WriteZero,
@@ -146,7 +143,7 @@ impl KvRecordBatchBuilder {
             ));
         }
 
-        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
row_bytes)?;
         debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
 
         self.current_record_number += 1;
@@ -349,12 +346,12 @@ mod tests {
 
         let key1 = b"key1";
         let value1 = create_test_row(b"value1");
-        assert!(builder.has_room_for_row(key1, Some(&value1)));
-        builder.append_row(key1, Some(&value1)).unwrap();
+        assert!(builder.has_room_for_row(key1, Some(value1.as_bytes())));
+        builder.append_row(key1, Some(value1.as_bytes())).unwrap();
 
         let key2 = b"key2";
-        assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
-        builder.append_row::<CompactedRow>(key2, None).unwrap();
+        assert!(builder.has_room_for_row(key2, None));
+        builder.append_row(key2, None).unwrap();
 
         builder.close().unwrap();
         assert!(builder.is_closed());
@@ -369,35 +366,34 @@ mod tests {
         // Test lifecycle: abort behavior
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
         let value = create_test_row(b"value");
-        builder.append_row(b"key", Some(&value)).unwrap();
+        builder.append_row(b"key", Some(value.as_bytes())).unwrap();
         builder.abort();
-        assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
+        assert!(builder.append_row(b"key2", None).is_err());
         assert!(builder.build().is_err());
         assert!(builder.close().is_err());
 
         // Test lifecycle: close behavior
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
         let value = create_test_row(b"value");
-        builder.append_row(b"key", Some(&value)).unwrap();
+        builder.append_row(b"key", Some(value.as_bytes())).unwrap();
         builder.close().unwrap();
-        assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
+        assert!(builder.append_row(b"key2", None).is_err());
         assert!(builder.build().is_ok());
 
         // Test KvFormat validation
         let mut row_writer = CompactedRowWriter::new(1);
         row_writer.write_int(42);
-        let row_type = RowType::with_data_types(vec![DataTypes::int()]);
-        let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
+        let row_bytes = row_writer.buffer();
 
         // INDEXED format should reject append_row
         let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::INDEXED);
-        let result = indexed_builder.append_row(b"key", Some(row));
+        let result = indexed_builder.append_row(b"key", Some(row_bytes));
         assert!(result.is_err());
         assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
 
         // COMPACTED format should accept append_row
         let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
-        let result = compacted_builder.append_row(b"key", Some(row));
+        let result = compacted_builder.append_row(b"key", Some(row_bytes));
         assert!(result.is_ok());
     }
 
@@ -410,15 +406,17 @@ mod tests {
         let large_key = vec![0u8; 1000];
         let large_value = vec![1u8; 1000];
         let large_row = create_test_row(&large_value);
-        assert!(!builder.has_room_for_row(&large_key, Some(&large_row)));
+        assert!(!builder.has_room_for_row(&large_key, 
Some(large_row.as_bytes())));
         let small_value = create_test_row(b"value");
-        assert!(builder.has_room_for_row(b"key", Some(&small_value)));
+        assert!(builder.has_room_for_row(b"key", 
Some(small_value.as_bytes())));
 
         // Test append enforcement - add small record first
-        builder.append_row(b"key", Some(&small_value)).unwrap();
+        builder
+            .append_row(b"key", Some(small_value.as_bytes()))
+            .unwrap();
 
         // Try to add large record that exceeds limit (reuse large_row from 
above)
-        let result = builder.append_row(b"key2", Some(&large_row));
+        let result = builder.append_row(b"key2", Some(large_row.as_bytes()));
         assert!(result.is_err());
         assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero);
     }
@@ -429,10 +427,12 @@ mod tests {
         builder.current_record_number = i32::MAX - 1;
 
         let value1 = create_test_row(b"value1");
-        builder.append_row(b"key1", Some(&value1)).unwrap();
+        builder
+            .append_row(b"key1", Some(value1.as_bytes()))
+            .unwrap();
 
         let value2 = create_test_row(b"value2");
-        let result = builder.append_row(b"key2", Some(&value2));
+        let result = builder.append_row(b"key2", Some(value2.as_bytes()));
         assert!(result.is_err());
         assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
     }
@@ -452,13 +452,17 @@ mod tests {
         builder.set_writer_state(100, 5);
 
         let value1 = create_test_row(b"value1");
-        builder.append_row(b"key1", Some(&value1)).unwrap();
+        builder
+            .append_row(b"key1", Some(value1.as_bytes()))
+            .unwrap();
         let bytes1 = builder.build().unwrap();
         let len1 = bytes1.len();
 
         // Append another record - this should invalidate the cache
         let value2 = create_test_row(b"value2");
-        builder.append_row(b"key2", Some(&value2)).unwrap();
+        builder
+            .append_row(b"key2", Some(value2.as_bytes()))
+            .unwrap();
         let bytes2 = builder.build().unwrap();
         let len2 = bytes2.len();
 
@@ -472,7 +476,7 @@ mod tests {
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
         builder.set_writer_state(100, 5);
         let value = create_test_row(b"value");
-        builder.append_row(b"key", Some(&value)).unwrap();
+        builder.append_row(b"key", Some(value.as_bytes())).unwrap();
         let bytes1 = builder.build().unwrap();
 
         // Change writer state - this should invalidate the cache
@@ -494,7 +498,6 @@ mod tests {
     fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> {
         use crate::record::kv::KvRecordBatch;
         use crate::row::InternalRow;
-        use crate::row::compacted::CompactedRow;
 
         let mut builder = KvRecordBatchBuilder::new(1, 100000, 
KvFormat::COMPACTED);
         builder.set_writer_state(100, 5);
@@ -504,26 +507,25 @@ mod tests {
         row_writer1.write_int(42);
         row_writer1.write_string("hello");
 
-        let row_type = RowType::with_data_types([DataTypes::int(), 
DataTypes::string()].to_vec());
-        let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer());
+        let row_bytes1 = row_writer1.buffer();
 
         let key1 = b"key1";
-        assert!(builder.has_room_for_row(key1, Some(row1)));
-        builder.append_row(key1, Some(row1))?;
+        assert!(builder.has_room_for_row(key1, Some(row_bytes1)));
+        builder.append_row(key1, Some(row_bytes1))?;
 
         // Create and append second record
         let mut row_writer2 = CompactedRowWriter::new(2);
         row_writer2.write_int(100);
         row_writer2.write_string("world");
 
-        let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer());
+        let row_bytes2 = row_writer2.buffer();
 
         let key2 = b"key2";
-        builder.append_row(key2, Some(row2))?;
+        builder.append_row(key2, Some(row_bytes2))?;
 
         // Append a deletion record
         let key3 = b"key3";
-        builder.append_row::<CompactedRow>(key3, None)?;
+        builder.append_row(key3, None)?;
 
         // Build and verify
         builder.close()?;
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs 
b/crates/fluss/src/record/kv/kv_record_read_context.rs
index fe6c6f0..9236321 100644
--- a/crates/fluss/src/record/kv/kv_record_read_context.rs
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -18,7 +18,7 @@
 //! Default implementation of ReadContext with decoder caching.
 
 use super::ReadContext;
-use crate::error::{Error, Result};
+use crate::error::Result;
 use crate::metadata::{KvFormat, Schema};
 use crate::row::{RowDecoder, RowDecoderFactory};
 use std::collections::HashMap;
@@ -85,20 +85,7 @@ impl ReadContext for KvRecordReadContext {
 
         // Build decoder outside the lock to avoid blocking other threads
         let schema = self.schema_getter.get_schema(schema_id)?;
-        let row_type = match schema.row_type() {
-            crate::metadata::DataType::Row(row_type) => row_type.clone(),
-            other => {
-                return Err(Error::IoUnexpectedError {
-                    message: format!(
-                        "Schema {schema_id} has invalid row type: expected 
Row, got {other:?}"
-                    ),
-                    source: std::io::Error::new(
-                        std::io::ErrorKind::InvalidData,
-                        "Invalid row type",
-                    ),
-                });
-            }
-        };
+        let row_type = schema.row_type().clone();
 
         // Create decoder outside lock
         let decoder = RowDecoderFactory::create(self.kv_format.clone(), 
row_type)?;
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index bc68ea1..35d684d 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client::WriteFormat;
 use crate::metadata::RowType;
 use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, 
CompactedRowReader};
-use crate::row::{BinaryRow, GenericRow, InternalRow};
+use crate::row::{GenericRow, InternalRow};
 use std::sync::{Arc, OnceLock};
 
 // Reference implementation:
@@ -69,10 +70,8 @@ impl<'a> CompactedRow<'a> {
         self.decoded_row
             .get_or_init(|| self.deserializer.deserialize(&self.reader))
     }
-}
 
-impl BinaryRow for CompactedRow<'_> {
-    fn as_bytes(&self) -> &[u8] {
+    pub fn as_bytes(&self) -> &[u8] {
         self.data
     }
 }
@@ -153,6 +152,14 @@ impl<'a> InternalRow for CompactedRow<'a> {
     fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampLtz {
         self.decoded_row().get_timestamp_ltz(pos, precision)
     }
+
+    fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
+        match write_format {
+            WriteFormat::CompactedKv => Some(self.as_bytes()),
+            WriteFormat::ArrowLog => None,
+            WriteFormat::CompactedLog => None,
+        }
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index d1ad047..ac0100e 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -63,6 +63,18 @@ impl CompactedRowWriter {
         Bytes::copy_from_slice(&self.buffer[..self.position])
     }
 
+    /// Flushes writer's ByteMut, resetting writer's inner state and returns 
Byte of flushed state
+    pub fn flush_bytes(&mut self) -> Bytes {
+        let used = self.buffer.split_to(self.position);
+        self.position = self.header_size_in_bytes;
+        if self.buffer.len() < self.header_size_in_bytes {
+            self.buffer.resize(self.header_size_in_bytes.max(64), 0);
+        } else {
+            self.buffer[..self.header_size_in_bytes].fill(0);
+        }
+        used.freeze()
+    }
+
     fn ensure_capacity(&mut self, need_len: usize) {
         if (self.buffer.len() - self.position) < need_len {
             let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + 
need_len);
diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs 
b/crates/fluss/src/row/encode/compacted_row_encoder.rs
index 48b9f3f..20f2882 100644
--- a/crates/fluss/src/row/encode/compacted_row_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs
@@ -20,8 +20,9 @@ use crate::error::Result;
 use crate::metadata::RowType;
 use crate::row::Datum;
 use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
-use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, 
CompactedRowWriter};
-use crate::row::encode::{BinaryRow, RowEncoder};
+use crate::row::compacted::{CompactedRowDeserializer, CompactedRowWriter};
+use crate::row::encode::RowEncoder;
+use bytes::Bytes;
 use std::sync::Arc;
 
 #[allow(dead_code)]
@@ -65,12 +66,8 @@ impl RowEncoder for CompactedRowEncoder<'_> {
             .write_value(&mut self.writer, pos, &value)
     }
 
-    fn finish_row(&mut self) -> Result<impl BinaryRow> {
-        Ok(CompactedRow::deserialize(
-            Arc::clone(&self.compacted_row_deserializer),
-            self.arity,
-            self.writer.buffer(),
-        ))
+    fn finish_row(&mut self) -> Result<Bytes> {
+        Ok(self.writer.flush_bytes())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/crates/fluss/src/row/encode/mod.rs 
b/crates/fluss/src/row/encode/mod.rs
index c294ecf..468d4d1 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -22,7 +22,7 @@ use crate::error::Result;
 use crate::metadata::{DataLakeFormat, KvFormat, RowType};
 use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
 use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
-use crate::row::{BinaryRow, Datum, InternalRow};
+use crate::row::{Datum, InternalRow};
 use bytes::Bytes;
 
 /// An interface for encoding key of row into bytes.
@@ -31,8 +31,9 @@ pub trait KeyEncoder {
     fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
 }
 
-#[allow(dead_code)]
-impl dyn KeyEncoder {
+pub struct KeyEncoderFactory;
+
+impl KeyEncoderFactory {
     /// Create a key encoder to encode the key bytes of the input row.
     /// # Arguments
     /// * `row_type` - the row type of the input row
@@ -43,23 +44,21 @@ impl dyn KeyEncoder {
     /// key encoder
     pub fn of(
         row_type: &RowType,
-        key_fields: Vec<String>,
-        data_lake_format: Option<DataLakeFormat>,
+        key_fields: &[String],
+        data_lake_format: &Option<DataLakeFormat>,
     ) -> Result<Box<dyn KeyEncoder>> {
         match data_lake_format {
             Some(DataLakeFormat::Paimon) => {
                 unimplemented!("KeyEncoder for Paimon format is currently 
unimplemented")
             }
             Some(DataLakeFormat::Lance) => 
Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
-                row_type,
-                key_fields.as_slice(),
+                row_type, key_fields,
             )?)),
             Some(DataLakeFormat::Iceberg) => {
                 unimplemented!("KeyEncoder for Iceberg format is currently 
unimplemented")
             }
             None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
-                row_type,
-                key_fields.as_slice(),
+                row_type, key_fields,
             )?)),
         }
     }
@@ -96,7 +95,7 @@ pub trait RowEncoder {
     ///
     /// # Returns
     /// * the written row
-    fn finish_row(&mut self) -> Result<impl BinaryRow>;
+    fn finish_row(&mut self) -> Result<Bytes>;
 
     /// Closes the row encoder
     ///
@@ -110,8 +109,8 @@ pub struct RowEncoderFactory {}
 
 #[allow(dead_code)]
 impl RowEncoderFactory {
-    pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result<impl 
RowEncoder> {
-        Self::create_for_field_types(kv_format, row_type.clone())
+    pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<impl 
RowEncoder> {
+        Self::create_for_field_types(kv_format, row_type)
     }
 
     pub fn create_for_field_types(
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index d2f640e..bc8134d 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -23,9 +23,11 @@ mod decimal;
 pub mod binary;
 pub mod compacted;
 pub mod encode;
-mod field_getter;
+pub mod field_getter;
 mod row_decoder;
 
+use crate::client::WriteFormat;
+use bytes::Bytes;
 pub use column::*;
 pub use compacted::CompactedRow;
 pub use datum::*;
@@ -33,9 +35,23 @@ pub use decimal::{Decimal, MAX_COMPACT_PRECISION};
 pub use encode::KeyEncoder;
 pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory};
 
-pub trait BinaryRow: InternalRow {
+pub struct BinaryRow<'a> {
+    data: BinaryDataWrapper<'a>,
+}
+
+pub enum BinaryDataWrapper<'a> {
+    Bytes(Bytes),
+    Ref(&'a [u8]),
+}
+
+impl<'a> BinaryRow<'a> {
     /// Returns the binary representation of this row as a byte slice.
-    fn as_bytes(&self) -> &[u8];
+    pub fn as_bytes(&'a self) -> &'a [u8] {
+        match &self.data {
+            BinaryDataWrapper::Bytes(bytes) => bytes.as_ref(),
+            BinaryDataWrapper::Ref(r) => r,
+        }
+    }
 }
 
 // TODO make functions return Result<?> for better error handling
@@ -99,6 +115,11 @@ pub trait InternalRow {
 
     /// Returns the binary value at the given position
     fn get_bytes(&self, pos: usize) -> &[u8];
+
+    /// Returns encoded bytes if already encoded
+    fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
+        None
+    }
 }
 
 pub struct GenericRow<'a> {

Reply via email to