This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 81dd6e0  feat: support produce partitioned table (#228)
81dd6e0 is described below

commit 81dd6e05bb2ae2e3bc046c74eac724bb4158d679
Author: yuxia Luo <[email protected]>
AuthorDate: Sat Jan 31 07:54:29 2026 +0800

    feat: support produce partitioned table (#228)
---
 bindings/cpp/src/lib.rs                            |   7 +-
 bindings/python/src/table.rs                       |   6 +-
 crates/examples/src/example_kv_table.rs            |   2 +-
 .../examples/src/example_partitioned_kv_table.rs   |   2 +-
 crates/examples/src/example_table.rs               |   8 +-
 crates/fluss/src/client/table/append.rs            |  55 +++++++--
 crates/fluss/src/client/table/log_fetch_buffer.rs  |   2 +-
 crates/fluss/src/client/table/mod.rs               |   4 +-
 crates/fluss/src/client/table/partition_getter.rs  |  19 ++-
 crates/fluss/src/client/table/scanner.rs           |  13 +--
 crates/fluss/src/client/table/upsert.rs            |  67 ++++++-----
 crates/fluss/src/client/table/writer.rs            |  46 --------
 crates/fluss/src/client/write/accumulator.rs       |  12 +-
 crates/fluss/src/client/write/batch.rs             |   2 +-
 crates/fluss/src/client/write/mod.rs               |   8 +-
 crates/fluss/src/client/write/sender.rs            |  15 +--
 crates/fluss/src/io/storage.rs                     |   2 +
 crates/fluss/src/record/arrow.rs                   |  54 ++++++---
 crates/fluss/src/row/column.rs                     |  36 +++---
 crates/fluss/src/row/compacted/compacted_row.rs    |  17 ++-
 crates/fluss/src/row/mod.rs                        |   2 +-
 crates/fluss/tests/integration/kv_table.rs         |  17 +--
 .../tests/integration/{table.rs => log_table.rs}   | 128 +++++++++++++++++++--
 .../fluss/tests/integration/table_remote_scan.rs   |   7 +-
 crates/fluss/tests/integration/utils.rs            |  26 ++++-
 crates/fluss/tests/test_fluss.rs                   |   2 +-
 26 files changed, 350 insertions(+), 209 deletions(-)

diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 2d37763..bd38a03 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -508,7 +508,10 @@ impl Table {
             Err(e) => return Err(format!("Failed to create append: {e}")),
         };
 
-        let writer = table_append.create_writer();
+        let writer = match table_append.create_writer() {
+            Ok(w) => w,
+            Err(e) => return Err(format!("Failed to create writer: {e}")),
+        };
         let writer = Box::into_raw(Box::new(AppendWriter { inner: writer }));
         Ok(writer)
     }
@@ -580,7 +583,7 @@ impl AppendWriter {
     fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult {
         let generic_row = types::ffi_row_to_core(row);
 
-        let result = RUNTIME.block_on(async { 
self.inner.append(generic_row).await });
+        let result = RUNTIME.block_on(async { 
self.inner.append(&generic_row).await });
 
         match result {
             Ok(_) => ok_result(),
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 4043350..48f09e7 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -68,7 +68,9 @@ impl FlussTable {
                 .new_append()
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-            let rust_writer = table_append.create_writer();
+            let rust_writer = table_append
+                .create_writer()
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
 
             let py_writer = AppendWriter::from_core(rust_writer, table_info);
 
@@ -251,7 +253,7 @@ impl AppendWriter {
 
         future_into_py(py, async move {
             inner
-                .append(generic_row)
+                .append(&generic_row)
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))
         })
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 032691e..437da06 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use clap::Parser;
-use fluss::client::{FlussConnection, UpsertWriter};
+use fluss::client::FlussConnection;
 use fluss::config::Config;
 use fluss::error::Result;
 use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index a5e76fa..1b0c303 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use clap::Parser;
-use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::client::{FlussAdmin, FlussConnection};
 use fluss::config::Config;
 use fluss::error::Result;
 use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index 92055a7..6c74e63 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -63,13 +63,13 @@ pub async fn main() -> Result<()> {
     row.set_field(2, 123_456_789_123i64);
 
     let table = conn.get_table(&table_path).await?;
-    let append_writer = table.new_append()?.create_writer();
-    let f1 = append_writer.append(row);
-    row = GenericRow::new(3);
+    let append_writer = table.new_append()?.create_writer()?;
+    let f1 = append_writer.append(&row);
+    let mut row = GenericRow::new(3);
     row.set_field(0, 233333);
     row.set_field(1, "tt44");
     row.set_field(2, 987_654_321_987i64);
-    let f2 = append_writer.append(row);
+    let f2 = append_writer.append(&row);
     try_join!(f1, f2, append_writer.flush())?;
 
     // scan rows
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index 7fe2023..ace91a6 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -15,16 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client::table::partition_getter::{PartitionGetter, 
get_physical_path};
 use crate::client::{WriteRecord, WriterClient};
 use crate::error::Result;
 use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
-use crate::row::GenericRow;
+use crate::row::{ColumnarRow, InternalRow};
 use arrow::array::RecordBatch;
 use std::sync::Arc;
 
-#[allow(dead_code)]
 pub struct TableAppend {
-    table_path: TablePath,
+    table_path: Arc<TablePath>,
     table_info: Arc<TableInfo>,
     writer_client: Arc<WriterClient>,
 }
@@ -36,32 +36,48 @@ impl TableAppend {
         writer_client: Arc<WriterClient>,
     ) -> Self {
         Self {
-            table_path,
+            table_path: Arc::new(table_path),
             table_info,
             writer_client,
         }
     }
 
-    pub fn create_writer(&self) -> AppendWriter {
-        AppendWriter {
-            physical_table_path: 
Arc::new(PhysicalTablePath::of(Arc::new(self.table_path.clone()))),
+    pub fn create_writer(&self) -> Result<AppendWriter> {
+        let partition_getter = if self.table_info.is_partitioned() {
+            Some(PartitionGetter::new(
+                self.table_info.row_type(),
+                Arc::clone(self.table_info.get_partition_keys()),
+            )?)
+        } else {
+            None
+        };
+
+        Ok(AppendWriter {
+            table_path: Arc::clone(&self.table_path),
+            partition_getter,
             writer_client: self.writer_client.clone(),
             table_info: Arc::clone(&self.table_info),
-        }
+        })
     }
 }
 
 pub struct AppendWriter {
-    physical_table_path: Arc<PhysicalTablePath>,
+    table_path: Arc<TablePath>,
+    partition_getter: Option<PartitionGetter>,
     writer_client: Arc<WriterClient>,
     table_info: Arc<TableInfo>,
 }
 
 impl AppendWriter {
-    pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
+    pub async fn append<R: InternalRow>(&self, row: &R) -> Result<()> {
+        let physical_table_path = Arc::new(get_physical_path(
+            &self.table_path,
+            self.partition_getter.as_ref(),
+            row,
+        )?);
         let record = WriteRecord::for_append(
             Arc::clone(&self.table_info),
-            Arc::clone(&self.physical_table_path),
+            physical_table_path,
             self.table_info.schema_id,
             row,
         );
@@ -70,10 +86,25 @@ impl AppendWriter {
         result_handle.result(result)
     }
 
+    /// Appends an Arrow RecordBatch to the table.
+    ///
+    /// For partitioned tables, the partition is derived from the **first 
row** of the batch.
+    /// Callers must ensure all rows in the batch belong to the same partition.
     pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+        let physical_table_path = if self.partition_getter.is_some() && 
batch.num_rows() > 0 {
+            let first_row = ColumnarRow::new(Arc::new(batch.clone()));
+            Arc::new(get_physical_path(
+                &self.table_path,
+                self.partition_getter.as_ref(),
+                &first_row,
+            )?)
+        } else {
+            Arc::new(PhysicalTablePath::of(Arc::clone(&self.table_path)))
+        };
+
         let record = WriteRecord::for_append_record_batch(
             Arc::clone(&self.table_info),
-            Arc::clone(&self.physical_table_path),
+            physical_table_path,
             self.table_info.schema_id,
             batch,
         );
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 78ee065..b622f19 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -917,7 +917,7 @@ mod tests {
         let mut row = GenericRow::new(2);
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
-        let record = WriteRecord::for_append(table_info, physical_table_path, 
1, row);
+        let record = WriteRecord::for_append(table_info, physical_table_path, 
1, &row);
         builder.append(&record)?;
 
         let data = builder.build()?;
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 2fbbbc9..6d54933 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -31,16 +31,14 @@ mod partition_getter;
 mod remote_log;
 mod scanner;
 mod upsert;
-mod writer;
 
-use crate::client::table::upsert::TableUpsert;
 pub use append::{AppendWriter, TableAppend};
 pub use lookup::{LookupResult, Lookuper, TableLookup};
 pub use remote_log::{
     DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS, 
DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
 };
 pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
-pub use writer::{TableWriter, UpsertWriter};
+pub use upsert::{TableUpsert, UpsertWriter};
 
 #[allow(dead_code)]
 pub struct FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/partition_getter.rs 
b/crates/fluss/src/client/table/partition_getter.rs
index 1a76106..9136801 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -17,12 +17,29 @@
 
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::{DataType, ResolvedPartitionSpec, RowType};
+use crate::metadata::{DataType, PhysicalTablePath, ResolvedPartitionSpec, 
RowType, TablePath};
 use crate::row::InternalRow;
 use crate::row::field_getter::FieldGetter;
 use crate::util::partition;
 use std::sync::Arc;
 
+/// Get the physical table path for a row, handling partitioned vs 
non-partitioned tables.
+pub fn get_physical_path<R: InternalRow>(
+    table_path: &Arc<TablePath>,
+    partition_getter: Option<&PartitionGetter>,
+    row: &R,
+) -> Result<PhysicalTablePath> {
+    if let Some(getter) = partition_getter {
+        let partition = getter.get_partition(row)?;
+        Ok(PhysicalTablePath::of_partitioned(
+            Arc::clone(table_path),
+            Some(partition),
+        ))
+    } else {
+        Ok(PhysicalTablePath::of(Arc::clone(table_path)))
+    }
+}
+
 /// A getter to get partition name from a row.
 #[allow(dead_code)]
 pub struct PartitionGetter {
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 14d2841..10e7fff 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -1516,14 +1516,11 @@ mod tests {
             },
         )?;
         let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
-        let record = WriteRecord::for_append(
-            Arc::new(table_info.clone()),
-            physical_table_path,
-            1,
-            GenericRow {
-                values: vec![Datum::Int32(1)],
-            },
-        );
+        let row = GenericRow {
+            values: vec![Datum::Int32(1)],
+        };
+        let record =
+            WriteRecord::for_append(Arc::new(table_info.clone()), 
physical_table_path, 1, &row);
         builder.append(&record)?;
         builder.build()
     }
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index 269d525..0595397 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -15,17 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult, 
UpsertWriter};
 use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::{PhysicalTablePath, RowType, TableInfo, TablePath};
+use crate::metadata::{RowType, TableInfo, TablePath};
 use crate::row::InternalRow;
 use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, 
RowEncoderFactory};
 use crate::row::field_getter::FieldGetter;
 use std::sync::Arc;
 
-use crate::client::table::partition_getter::PartitionGetter;
+use crate::client::table::partition_getter::{PartitionGetter, 
get_physical_path};
 use bitvec::prelude::bitvec;
 use bytes::Bytes;
 
@@ -98,7 +97,7 @@ impl TableUpsert {
         self.partial_update(Some(valid_col_indices))
     }
 
-    pub fn create_writer(&self) -> Result<impl UpsertWriter> {
+    pub fn create_writer(&self) -> Result<UpsertWriter> {
         UpsertWriterFactory::create(
             Arc::new(self.table_path.clone()),
             Arc::new(self.table_info.clone()),
@@ -108,10 +107,7 @@ impl TableUpsert {
     }
 }
 
-struct UpsertWriterImpl<RE>
-where
-    RE: RowEncoder,
-{
+pub struct UpsertWriter {
     table_path: Arc<TablePath>,
     writer_client: Arc<WriterClient>,
     partition_field_getter: Option<PartitionGetter>,
@@ -120,7 +116,7 @@ where
     // Use primary key encoder as bucket key encoder when None
     bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
     write_format: WriteFormat,
-    row_encoder: RE,
+    row_encoder: Box<dyn RowEncoder>,
     field_getters: Box<[FieldGetter]>,
     table_info: Arc<TableInfo>,
 }
@@ -133,7 +129,7 @@ impl UpsertWriterFactory {
         table_info: Arc<TableInfo>,
         partial_update_columns: Option<Arc<Vec<usize>>>,
         writer_client: Arc<WriterClient>,
-    ) -> Result<impl UpsertWriter> {
+    ) -> Result<UpsertWriter> {
         let data_lake_format = &table_info.table_config.get_datalake_format()?;
         let row_type = table_info.row_type();
         let physical_pks = table_info.get_physical_primary_keys();
@@ -173,7 +169,7 @@ impl UpsertWriterFactory {
             None
         };
 
-        Ok(UpsertWriterImpl {
+        Ok(UpsertWriter {
             table_path,
             partition_field_getter,
             writer_client,
@@ -181,7 +177,7 @@ impl UpsertWriterFactory {
             target_columns: partial_update_columns,
             bucket_key_encoder,
             write_format,
-            row_encoder: RowEncoderFactory::create(kv_format, 
row_type.clone())?,
+            row_encoder: Box::new(RowEncoderFactory::create(kv_format, 
row_type.clone())?),
             field_getters,
             table_info: table_info.clone(),
         })
@@ -283,8 +279,7 @@ impl UpsertWriterFactory {
     }
 }
 
-#[allow(dead_code)]
-impl<RE: RowEncoder> UpsertWriterImpl<RE> {
+impl UpsertWriter {
     fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
         let expected = self.table_info.get_row_type().fields().len();
         if row.get_field_count() != expected {
@@ -317,31 +312,15 @@ impl<RE: RowEncoder> UpsertWriterImpl<RE> {
         self.row_encoder.finish_row()
     }
 
-    fn get_physical_path<R: InternalRow>(&self, row: &R) -> 
Result<PhysicalTablePath> {
-        if let Some(partition_getter) = &self.partition_field_getter {
-            let partition = partition_getter.get_partition(row);
-            Ok(PhysicalTablePath::of_partitioned(
-                Arc::clone(&self.table_path),
-                Some(partition?),
-            ))
-        } else {
-            Ok(PhysicalTablePath::of(Arc::clone(&self.table_path)))
-        }
-    }
-}
-
-impl<RE: RowEncoder> TableWriter for UpsertWriterImpl<RE> {
     /// Flush data written that have not yet been sent to the server, forcing 
the client to send the
     /// requests to server and blocks on the completion of the requests 
associated with these
     /// records. A request is considered completed when it is successfully 
acknowledged according to
     /// the CLIENT_WRITER_ACKS configuration option you have specified or else 
it
     /// results in an error.
-    async fn flush(&self) -> Result<()> {
+    pub async fn flush(&self) -> Result<()> {
         self.writer_client.flush().await
     }
-}
 
-impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
     /// Inserts row into Fluss table if they do not already exist, or updates 
them if they do exist.
     ///
     /// # Arguments
@@ -349,7 +328,7 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
     ///
     /// # Returns
     /// Ok(UpsertResult) when completed normally
-    async fn upsert<R: InternalRow>(&mut self, row: &R) -> 
Result<UpsertResult> {
+    pub async fn upsert<R: InternalRow>(&mut self, row: &R) -> 
Result<UpsertResult> {
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
@@ -361,7 +340,11 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> 
{
 
         let write_record = WriteRecord::for_upsert(
             Arc::clone(&self.table_info),
-            Arc::new(self.get_physical_path(row)?),
+            Arc::new(get_physical_path(
+                &self.table_path,
+                self.partition_field_getter.as_ref(),
+                row,
+            )?),
             self.table_info.schema_id,
             key,
             bucket_key,
@@ -384,14 +367,18 @@ impl<RE: RowEncoder> UpsertWriter for 
UpsertWriterImpl<RE> {
     ///
     /// # Returns
     /// Ok(DeleteResult) when completed normally
-    async fn delete<R: InternalRow>(&mut self, row: &R) -> 
Result<DeleteResult> {
+    pub async fn delete<R: InternalRow>(&mut self, row: &R) -> 
Result<DeleteResult> {
         self.check_field_count(row)?;
 
         let (key, bucket_key) = self.get_keys(row)?;
 
         let write_record = WriteRecord::for_upsert(
             Arc::clone(&self.table_info),
-            Arc::new(self.get_physical_path(row)?),
+            Arc::new(get_physical_path(
+                &self.table_path,
+                self.partition_field_getter.as_ref(),
+                row,
+            )?),
             self.table_info.schema_id,
             key,
             bucket_key,
@@ -537,3 +524,13 @@ mod tests {
         ));
     }
 }
+
+/// The result of upserting a record
+/// Currently this is an empty struct to allow for compatible evolution in the 
future
+#[derive(Default)]
+pub struct UpsertResult;
+
+/// The result of deleting a record
+/// Currently this is an empty struct to allow for compatible evolution in the 
future
+#[derive(Default)]
+pub struct DeleteResult;
diff --git a/crates/fluss/src/client/table/writer.rs 
b/crates/fluss/src/client/table/writer.rs
deleted file mode 100644
index ec26ec6..0000000
--- a/crates/fluss/src/client/table/writer.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::row::{GenericRow, InternalRow};
-
-use crate::error::Result;
-
-#[allow(dead_code, async_fn_in_trait)]
-pub trait TableWriter {
-    async fn flush(&self) -> Result<()>;
-}
-
-#[allow(dead_code)]
-pub trait AppendWriter: TableWriter {
-    async fn append(&self, row: GenericRow) -> Result<()>;
-}
-
-#[allow(dead_code, async_fn_in_trait)]
-pub trait UpsertWriter: TableWriter {
-    async fn upsert<R: InternalRow>(&mut self, row: &R) -> 
Result<UpsertResult>;
-    async fn delete<R: InternalRow>(&mut self, row: &R) -> 
Result<DeleteResult>;
-}
-
-/// The result of upserting a record
-/// Currently this is an empty struct to allow for compatible evolution in the 
future
-#[derive(Default)]
-pub struct UpsertResult;
-
-/// The result of deleting a record
-/// Currently this is an empty struct to allow for compatible evolution in the 
future
-#[derive(Default)]
-pub struct DeleteResult;
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 2a45517..5eae868 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -599,14 +599,10 @@ mod tests {
         let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
         let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
         let cluster = Arc::new(build_cluster(&table_path, 1, 1));
-        let record = WriteRecord::for_append(
-            table_info,
-            physical_table_path,
-            1,
-            GenericRow {
-                values: vec![Datum::Int32(1)],
-            },
-        );
+        let row = GenericRow {
+            values: vec![Datum::Int32(1)],
+        };
+        let record = WriteRecord::for_append(table_info, physical_table_path, 
1, &row);
 
         accumulator.append(&record, 0, &cluster, false).await?;
 
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index da30c8a..c765473 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -426,7 +426,7 @@ mod tests {
                     Arc::clone(&table_info),
                     Arc::clone(&physical_table_path),
                     1,
-                    row,
+                    &row,
                 );
                 batch.try_append(&record).unwrap();
             }
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index 868b582..25a0db6 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -22,7 +22,7 @@ use crate::client::broadcast::{self as client_broadcast, 
BatchWriteResult, Broad
 use crate::error::Error;
 use crate::metadata::{PhysicalTablePath, TableInfo};
 
-use crate::row::GenericRow;
+use crate::row::InternalRow;
 pub use accumulator::*;
 use arrow::array::RecordBatch;
 use bytes::Bytes;
@@ -64,7 +64,7 @@ pub enum Record<'a> {
 }
 
 pub enum LogWriteRecord<'a> {
-    Generic(GenericRow<'a>),
+    InternalRow(&'a dyn InternalRow),
     RecordBatch(Arc<RecordBatch>),
 }
 
@@ -112,11 +112,11 @@ impl<'a> WriteRecord<'a> {
         table_info: Arc<TableInfo>,
         physical_table_path: Arc<PhysicalTablePath>,
         schema_id: i32,
-        row: GenericRow<'a>,
+        row: &'a dyn InternalRow,
     ) -> Self {
         Self {
             table_info,
-            record: Record::Log(LogWriteRecord::Generic(row)),
+            record: Record::Log(LogWriteRecord::InternalRow(row)),
             physical_table_path,
             bucket_key: None,
             schema_id,
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index 6a7dad0..f336d0c 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -103,8 +103,7 @@ impl Sender {
                         if api_error.code == 
FlussError::PartitionNotExists.code() =>
                     {
                         warn!(
-                            "Partition does not exist during metadata update, 
continuing: {}",
-                            api_error
+                            "Partition does not exist during metadata update, 
continuing: {api_error}"
                         );
                     }
                     _ => return Err(e),
@@ -664,14 +663,10 @@ mod tests {
     ) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> {
         let table_info = 
Arc::new(build_table_info(table_path.as_ref().clone(), 1, 1));
         let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
-        let record = WriteRecord::for_append(
-            table_info,
-            physical_table_path,
-            1,
-            GenericRow {
-                values: vec![Datum::Int32(1)],
-            },
-        );
+        let row = GenericRow {
+            values: vec![Datum::Int32(1)],
+        };
+        let record = WriteRecord::for_append(table_info, physical_table_path, 
1, &row);
         let result = accumulator.append(&record, 0, &cluster, false).await?;
         let result_handle = result.result_handle.expect("result handle");
         let server = cluster.get_tablet_server(1).expect("server");
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index a370861..a573517 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -19,6 +19,8 @@ use crate::error;
 use crate::error::Result;
 use crate::io::FileIOBuilder;
 use opendal::{Operator, Scheme};
+#[cfg(any(feature = "storage-s3", feature = "storage-oss"))]
+use std::collections::HashMap;
 
 /// The storage carries all supported storage services in fluss
 #[derive(Debug)]
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 726106b..b798896 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -20,7 +20,8 @@ use crate::compression::ArrowCompressionInfo;
 use crate::error::{Error, Result};
 use crate::metadata::{DataType, RowType};
 use crate::record::{ChangeType, ScanRecord};
-use crate::row::{ColumnarRow, GenericRow};
+use crate::row::field_getter::FieldGetter;
+use crate::row::{ColumnarRow, InternalRow};
 use arrow::array::{
     ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
     Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, 
Int64Builder,
@@ -166,7 +167,7 @@ pub struct MemoryLogRecordsArrowBuilder {
 pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
     fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>>;
 
-    fn append(&mut self, row: &GenericRow) -> Result<bool>;
+    fn append(&mut self, row: &dyn InternalRow) -> Result<bool>;
 
     fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>;
 
@@ -191,7 +192,7 @@ impl ArrowRecordBatchInnerBuilder for 
PrebuiltRecordBatchBuilder {
         Ok(self.arrow_record_batch.as_ref().unwrap().clone())
     }
 
-    fn append(&mut self, _row: &GenericRow) -> Result<bool> {
+    fn append(&mut self, _row: &dyn InternalRow) -> Result<bool> {
         // append one single row is not supported, return false directly
         Ok(false)
     }
@@ -229,6 +230,7 @@ impl ArrowRecordBatchInnerBuilder for 
PrebuiltRecordBatchBuilder {
 pub struct RowAppendRecordBatchBuilder {
     table_schema: SchemaRef,
     arrow_column_builders: Vec<Box<dyn ArrayBuilder>>,
+    field_getters: Box<[FieldGetter]>,
     records_count: i32,
 }
 
@@ -240,9 +242,11 @@ impl RowAppendRecordBatchBuilder {
             .iter()
             .map(|field| Self::create_builder(field.data_type()))
             .collect();
+        let field_getters = FieldGetter::create_field_getters(row_type);
         Ok(Self {
             table_schema: schema_ref.clone(),
             arrow_column_builders: builders?,
+            field_getters,
             records_count: 0,
         })
     }
@@ -346,11 +350,18 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
         )?))
     }
 
-    fn append(&mut self, row: &GenericRow) -> Result<bool> {
-        for (idx, value) in row.values.iter().enumerate() {
+    fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
+        for (idx, getter) in self.field_getters.iter().enumerate() {
+            let datum = getter.get_field(row);
             let field_type = self.table_schema.field(idx).data_type();
-            let builder = self.arrow_column_builders.get_mut(idx).unwrap();
-            value.append_to(builder.as_mut(), field_type)?;
+            let builder =
+                self.arrow_column_builders
+                    .get_mut(idx)
+                    .ok_or_else(|| Error::UnexpectedError {
+                        message: format!("Column builder at index {idx} not 
found."),
+                        source: None,
+                    })?;
+            datum.append_to(builder, field_type)?;
         }
         self.records_count += 1;
         Ok(true)
@@ -412,7 +423,9 @@ impl MemoryLogRecordsArrowBuilder {
     pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
         match &record.record() {
             Record::Log(log_write_record) => match log_write_record {
-                LogWriteRecord::Generic(row) => 
Ok(self.arrow_record_batch_builder.append(row)?),
+                LogWriteRecord::InternalRow(row) => {
+                    Ok(self.arrow_record_batch_builder.append(*row)?)
+                }
                 LogWriteRecord::RecordBatch(record_batch) => Ok(self
                     .arrow_record_batch_builder
                     .append_batch(record_batch.clone())?),
@@ -1715,9 +1728,10 @@ mod tests {
         )]);
         let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
         let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
-        builder.append(&GenericRow {
+        let row = GenericRow {
             values: vec![Datum::Decimal(decimal)],
-        })?;
+        };
+        builder.append(&row)?;
         let batch = builder.build_arrow_record_batch()?;
         let array = batch
             .column(0)
@@ -1735,9 +1749,10 @@ mod tests {
         )]);
         let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
         let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
-        let result = builder.append(&GenericRow {
+        let row = GenericRow {
             values: vec![Datum::Decimal(decimal)],
-        });
+        };
+        let result = builder.append(&row);
         assert!(result.is_err());
         assert!(
             result
@@ -1832,7 +1847,7 @@ mod tests {
         let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
 
         // Append rows with various data types
-        builder.append(&GenericRow {
+        let row = GenericRow {
             values: vec![
                 Datum::Int32(1),
                 Datum::Decimal(Decimal::from_big_decimal(
@@ -1851,7 +1866,8 @@ mod tests {
                 // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 987654 
additional nanoseconds
                 
Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?),
             ],
-        })?;
+        };
+        builder.append(&row)?;
 
         let batch = builder.build_arrow_record_batch()?;
 
@@ -1959,15 +1975,19 @@ mod tests {
         let mut row = GenericRow::new(2);
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
-        let record =
-            WriteRecord::for_append(Arc::clone(&table_info), 
physical_table_path.clone(), 1, row);
+        let record = WriteRecord::for_append(
+            Arc::clone(&table_info),
+            physical_table_path.clone(),
+            1,
+            &row,
+        );
         builder.append(&record)?;
 
         let mut row2 = GenericRow::new(2);
         row2.set_field(0, 2_i32);
         row2.set_field(1, "bob");
         let record2 =
-            WriteRecord::for_append(Arc::clone(&table_info), 
physical_table_path, 2, row2);
+            WriteRecord::for_append(Arc::clone(&table_info), 
physical_table_path, 2, &row2);
         builder.append(&record2)?;
 
         let data = builder.build()?;
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index f48075b..50db32b 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -248,6 +248,24 @@ impl InternalRow for ColumnarRow {
             .value(self.row_id)
     }
 
+    fn get_char(&self, pos: usize, _length: usize) -> &str {
+        self.record_batch
+            .column(pos)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Expected String array for char type")
+            .value(self.row_id)
+    }
+
+    fn get_string(&self, pos: usize) -> &str {
+        self.record_batch
+            .column(pos)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Expected String array.")
+            .value(self.row_id)
+    }
+
     fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
crate::row::Decimal {
         use arrow::datatypes::DataType;
 
@@ -327,24 +345,6 @@ impl InternalRow for ColumnarRow {
         )
     }
 
-    fn get_char(&self, pos: usize, _length: usize) -> &str {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .expect("Expected String array for char type")
-            .value(self.row_id)
-    }
-
-    fn get_string(&self, pos: usize) -> &str {
-        self.record_batch
-            .column(pos)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .expect("Expected String array.")
-            .value(self.row_id)
-    }
-
     fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
         self.record_batch
             .column(pos)
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index 35d684d..2322207 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -76,7 +76,6 @@ impl<'a> CompactedRow<'a> {
     }
 }
 
-#[allow(dead_code)]
 impl<'a> InternalRow for CompactedRow<'a> {
     fn get_field_count(&self) -> usize {
         self.arity
@@ -125,14 +124,6 @@ impl<'a> InternalRow for CompactedRow<'a> {
         self.decoded_row().get_string(pos)
     }
 
-    fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
-        self.decoded_row().get_binary(pos, length)
-    }
-
-    fn get_bytes(&self, pos: usize) -> &[u8] {
-        self.decoded_row().get_bytes(pos)
-    }
-
     fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
crate::row::Decimal {
         self.decoded_row().get_decimal(pos, precision, scale)
     }
@@ -153,6 +144,14 @@ impl<'a> InternalRow for CompactedRow<'a> {
         self.decoded_row().get_timestamp_ltz(pos, precision)
     }
 
+    fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
+        self.decoded_row().get_binary(pos, length)
+    }
+
+    fn get_bytes(&self, pos: usize) -> &[u8] {
+        self.decoded_row().get_bytes(pos)
+    }
+
     fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
         match write_format {
             WriteFormat::CompactedKv => Some(self.as_bytes()),
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index f7c8bec..276dcca 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -55,7 +55,7 @@ impl<'a> BinaryRow<'a> {
 }
 
 // TODO make functions return Result<?> for better error handling
-pub trait InternalRow {
+pub trait InternalRow: Send + Sync {
     /// Returns the number of fields in this row
     fn get_field_count(&self) -> usize;
 
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index b2263c2..3691d65 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -34,9 +34,10 @@ static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
 mod kv_table_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::FlussTestingCluster;
-    use crate::integration::utils::{create_table, get_cluster, start_cluster, 
stop_cluster};
-    use fluss::client::UpsertWriter;
-    use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
+    use crate::integration::utils::{
+        create_partitions, create_table, get_cluster, start_cluster, 
stop_cluster,
+    };
+    use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
     use fluss::row::{GenericRow, InternalRow};
     use std::sync::Arc;
 
@@ -467,15 +468,7 @@ mod kv_table_test {
         create_table(&admin, &table_path, &table_descriptor).await;
 
         // Create partitions for each region before inserting data
-        for region in &["US", "EU", "APAC"] {
-            let mut partition_map = std::collections::HashMap::new();
-            partition_map.insert("region".to_string(), region.to_string());
-            let partition_spec = PartitionSpec::new(partition_map);
-            admin
-                .create_partition(&table_path, &partition_spec, false)
-                .await
-                .expect("Failed to create partition");
-        }
+        create_partitions(&admin, &table_path, "region", &["US", "EU", 
"APAC"]).await;
 
         let connection = cluster.get_fluss_connection().await;
 
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/log_table.rs
similarity index 90%
rename from crates/fluss/tests/integration/table.rs
rename to crates/fluss/tests/integration/log_table.rs
index 6a15674..64e6289 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -34,7 +34,9 @@ static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
 mod table_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::FlussTestingCluster;
-    use crate::integration::utils::{create_table, get_cluster, start_cluster, 
stop_cluster};
+    use crate::integration::utils::{
+        create_partitions, create_table, get_cluster, start_cluster, 
stop_cluster,
+    };
     use arrow::array::record_batch;
     use fluss::client::{FlussTable, TableScan};
     use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, 
TablePath};
@@ -44,6 +46,8 @@ mod table_test {
     use jiff::Timestamp;
     use std::collections::HashMap;
     use std::sync::Arc;
+    use std::sync::atomic::AtomicUsize;
+    use std::sync::atomic::Ordering;
     use std::time::Duration;
 
     fn before_all() {
@@ -91,7 +95,8 @@ mod table_test {
         let append_writer = table
             .new_append()
             .expect("Failed to create append")
-            .create_writer();
+            .create_writer()
+            .expect("Failed to create writer");
 
         let batch1 =
             record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2", 
"a3"])).unwrap();
@@ -217,7 +222,8 @@ mod table_test {
             .expect("Failed to get table")
             .new_append()
             .expect("Failed to create append")
-            .create_writer();
+            .create_writer()
+            .expect("Failed to create writer");
 
         let batch = record_batch!(
             ("id", Int32, [1, 2, 3]),
@@ -314,7 +320,8 @@ mod table_test {
         let append_writer = table
             .new_append()
             .expect("Failed to create append")
-            .create_writer();
+            .create_writer()
+            .expect("Failed to create writer");
 
         let batch = record_batch!(
             ("col_a", Int32, [1, 2, 3]),
@@ -472,7 +479,7 @@ mod table_test {
                 .is_empty()
         );
 
-        let writer = table.new_append().unwrap().create_writer();
+        let writer = table.new_append().unwrap().create_writer().unwrap();
         writer
             .append_arrow_batch(
                 record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a", 
"b"])).unwrap(),
@@ -676,7 +683,8 @@ mod table_test {
         let append_writer = table
             .new_append()
             .expect("Failed to create append")
-            .create_writer();
+            .create_writer()
+            .expect("Failed to create writer");
 
         // Test data for all datatypes
         let col_tinyint = 127i8;
@@ -749,7 +757,7 @@ mod table_test {
         row.set_field(27, col_timestamp_ltz_ns_neg.clone());
 
         append_writer
-            .append(row)
+            .append(&row)
             .await
             .expect("Failed to append row with all datatypes");
 
@@ -760,7 +768,7 @@ mod table_test {
         }
 
         append_writer
-            .append(row_with_nulls)
+            .append(&row_with_nulls)
             .await
             .expect("Failed to append row with nulls");
 
@@ -967,4 +975,108 @@ mod table_test {
             .await
             .expect("Failed to drop table");
     }
+
+    #[tokio::test]
+    async fn partitioned_table_append() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new(
+            "fluss".to_string(),
+            "test_partitioned_log_append".to_string(),
+        );
+
+        // Create a partitioned log table
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("region", DataTypes::string())
+                    .column("value", DataTypes::bigint())
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .partitioned_by(vec!["region".to_string()])
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        // Create partitions
+        create_partitions(&admin, &table_path, "region", &["US", "EU"]).await;
+
+        // Wait for partitions to be available
+        tokio::time::sleep(Duration::from_secs(2)).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        // Create append writer - this should now work for partitioned tables
+        let append_writer = table
+            .new_append()
+            .expect("Failed to create append")
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Append records with different partitions
+        let test_data = [
+            (1, "US", 100i64),
+            (2, "US", 200i64),
+            (3, "EU", 300i64),
+            (4, "EU", 400i64),
+        ];
+
+        for (id, region, value) in &test_data {
+            let mut row = fluss::row::GenericRow::new(3);
+            row.set_field(0, *id);
+            row.set_field(1, *region);
+            row.set_field(2, *value);
+            append_writer
+                .append(&row)
+                .await
+                .expect("Failed to append row");
+        }
+
+        append_writer.flush().await.expect("Failed to flush");
+
+        // Test append_arrow_batch for partitioned tables
+        // Each batch must contain rows from the same partition
+        let us_batch = record_batch!(
+            ("id", Int32, [5, 6]),
+            ("region", Utf8, ["US", "US"]),
+            ("value", Int64, [500, 600])
+        )
+        .unwrap();
+        append_writer
+            .append_arrow_batch(us_batch)
+            .await
+            .expect("Failed to append US batch");
+
+        let eu_batch = record_batch!(
+            ("id", Int32, [7, 8]),
+            ("region", Utf8, ["EU", "EU"]),
+            ("value", Int64, [700, 800])
+        )
+        .unwrap();
+        append_writer
+            .append_arrow_batch(eu_batch)
+            .await
+            .expect("Failed to append EU batch");
+
+        append_writer
+            .flush()
+            .await
+            .expect("Failed to flush batches");
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+
+        // todo: add scan test in 203
+    }
 }
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index c83da0f..ce0c137 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -40,10 +40,10 @@ mod table_remote_scan_test {
     use fluss::row::{GenericRow, InternalRow};
     use std::collections::HashMap;
     use std::sync::Arc;
+    use std::sync::atomic::AtomicUsize;
     use std::thread;
     use std::time::Duration;
     use uuid::Uuid;
-
     fn before_all() {
         // Create a new tokio runtime in a separate thread
         let cluster_lock = SHARED_FLUSS_CLUSTER.clone();
@@ -141,7 +141,8 @@ mod table_remote_scan_test {
         let append_writer = table
             .new_append()
             .expect("Failed to create append")
-            .create_writer();
+            .create_writer()
+            .expect("Failed to create writer");
 
         // append 20 rows, there must be some tiered to remote
         let record_count = 20;
@@ -151,7 +152,7 @@ mod table_remote_scan_test {
             let v = format!("v{}", i);
             row.set_field(1, v.as_str());
             append_writer
-                .append(row)
+                .append(&row)
                 .await
                 .expect("Failed to append row");
         }
diff --git a/crates/fluss/tests/integration/utils.rs 
b/crates/fluss/tests/integration/utils.rs
index 4d0c349..fd5145a 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -17,8 +17,9 @@
  */
 use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
 use fluss::client::FlussAdmin;
-use fluss::metadata::{TableDescriptor, TablePath};
+use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath};
 use parking_lot::RwLock;
+use std::collections::HashMap;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -102,3 +103,26 @@ pub fn get_cluster(cluster_lock: 
&RwLock<Option<FlussTestingCluster>>) -> Arc<Fl
             .clone(),
     )
 }
+
+/// Creates partitions for a partitioned table.
+///
+/// # Arguments
+/// * `admin` - The FlussAdmin instance
+/// * `table_path` - The table path
+/// * `partition_column` - The partition column name
+/// * `partition_values` - The partition values to create
+pub async fn create_partitions(
+    admin: &FlussAdmin,
+    table_path: &TablePath,
+    partition_column: &str,
+    partition_values: &[&str],
+) {
+    for value in partition_values {
+        let mut partition_map = HashMap::new();
+        partition_map.insert(partition_column.to_string(), value.to_string());
+        admin
+            .create_partition(table_path, &PartitionSpec::new(partition_map), 
true)
+            .await
+            .expect("Failed to create partition");
+    }
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index f3987e6..a6cc27a 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -23,7 +23,7 @@ mod integration {
     mod admin;
     mod fluss_cluster;
     mod kv_table;
-    mod table;
+    mod log_table;
 
     mod utils;
 

Reply via email to