This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 83fa0aa  feat: Implement partitioning in Upsert / Lookup (#220)
83fa0aa is described below

commit 83fa0aa698140e613c3700d8811cd6a1258781f3
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 30 08:08:25 2026 +0000

    feat: Implement partitioning in Upsert / Lookup (#220)
---
 crates/examples/Cargo.toml                         |   6 +-
 .../examples/src/example_partitioned_kv_table.rs   | 153 ++++++++++++++++
 crates/examples/src/example_table.rs               |   1 +
 crates/fluss/src/bucketing/mod.rs                  |   1 -
 crates/fluss/src/client/admin.rs                   |   8 +-
 crates/fluss/src/client/connection.rs              |   5 -
 crates/fluss/src/client/metadata.rs                |  81 ++++++--
 crates/fluss/src/client/table/append.rs            |  23 ++-
 crates/fluss/src/client/table/log_fetch_buffer.rs  |   9 +-
 crates/fluss/src/client/table/lookup.rs            |  95 +++++++---
 crates/fluss/src/client/table/mod.rs               |   2 +-
 crates/fluss/src/client/table/scanner.rs           |  19 +-
 crates/fluss/src/client/table/upsert.rs            |  39 +++-
 crates/fluss/src/client/table/writer.rs            |  55 ------
 crates/fluss/src/client/write/accumulator.rs       | 128 ++++++++-----
 crates/fluss/src/client/write/batch.rs             |  72 ++++----
 crates/fluss/src/client/write/bucket_assigner.rs   |  16 +-
 crates/fluss/src/client/write/mod.rs               |  33 +++-
 crates/fluss/src/client/write/sender.rs            | 118 +++++++++---
 crates/fluss/src/client/write/writer_client.rs     |  47 +++--
 crates/fluss/src/cluster/cluster.rs                | 204 ++++++++++++++++-----
 crates/fluss/src/cluster/mod.rs                    |  13 +-
 crates/fluss/src/error.rs                          |   6 +
 crates/fluss/src/metadata/datatype.rs              |   9 +
 crates/fluss/src/metadata/table.rs                 |  45 +++--
 crates/fluss/src/record/arrow.rs                   |  13 +-
 crates/fluss/src/rpc/message/update_metadata.rs    |  26 ++-
 crates/fluss/src/test_utils.rs                     |  11 +-
 crates/fluss/tests/integration/kv_table.rs         | 175 +++++++++++++++++-
 29 files changed, 1071 insertions(+), 342 deletions(-)

diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index 117ceb2..16629be 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -33,4 +33,8 @@ path = "src/example_table.rs"
 
 [[example]]
 name = "example-upsert-lookup"
-path = "src/example_kv_table.rs"
\ No newline at end of file
+path = "src/example_kv_table.rs"
+
+[[example]]
+name = "example-partitioned-upsert-lookup"
+path = "src/example_partitioned_kv_table.rs"
\ No newline at end of file
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
new file mode 100644
index 0000000..a5e76fa
--- /dev/null
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+use std::collections::HashMap;
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+    let mut config = Config::parse();
+    config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+    let conn = FlussConnection::new(config).await?;
+
+    let table_descriptor = TableDescriptor::builder()
+        .schema(
+            Schema::builder()
+                .column("id", DataTypes::int())
+                .column("region", DataTypes::string())
+                .column("zone", DataTypes::bigint())
+                .column("score", DataTypes::bigint())
+                .primary_key(vec![
+                    "id".to_string(),
+                    "region".to_string(),
+                    "zone".to_string(),
+                ])
+                .build()?,
+        )
+        .partitioned_by(vec!["region".to_string(), "zone".to_string()])
+        .build()?;
+
+    let table_path = TablePath::new("fluss".to_owned(), 
"partitioned_kv_example".to_owned());
+
+    let mut admin = conn.get_admin().await?;
+    admin
+        .create_table(&table_path, &table_descriptor, true)
+        .await?;
+    println!(
+        "Created KV Table:\n {}\n",
+        admin.get_table(&table_path).await?
+    );
+
+    create_partition(&table_path, &mut admin, "APAC", 1).await;
+    create_partition(&table_path, &mut admin, "EMEA", 2).await;
+    create_partition(&table_path, &mut admin, "US", 3).await;
+
+    let table = conn.get_table(&table_path).await?;
+    let table_upsert = table.new_upsert()?;
+    let mut upsert_writer = table_upsert.create_writer()?;
+
+    println!("\n=== Upserting ===");
+    for (id, region, zone, score) in [
+        (1001, "APAC", 1i64, 1234i64),
+        (1002, "EMEA", 2, 2234),
+        (1003, "US", 3, 3234),
+    ] {
+        let mut row = GenericRow::new(4);
+        row.set_field(0, id);
+        row.set_field(1, region);
+        row.set_field(2, zone);
+        row.set_field(3, score);
+        upsert_writer.upsert(&row).await?;
+        println!("Upserted: {row:?}");
+    }
+
+    println!("\n=== Looking up ===");
+    let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+    for (id, region, zone) in [(1001, "APAC", 1i64), (1002, "EMEA", 2), (1003, 
"US", 3)] {
+        let result = lookuper
+            .lookup(&make_key(id, region, zone))
+            .await
+            .expect("lookup");
+        let row = result.get_single_row()?.unwrap();
+        println!(
+            "Found id={id}: region={}, zone={}, score={}",
+            row.get_string(1),
+            row.get_long(2),
+            row.get_long(3)
+        );
+    }
+
+    println!("\n=== Updating ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 1001);
+    row.set_field(1, "APAC");
+    row.set_field(2, 1i64);
+    row.set_field(3, 4321i64);
+    upsert_writer.upsert(&row).await?;
+    println!("Updated: {row:?}");
+
+    let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
+    let row = result.get_single_row()?.unwrap();
+    println!(
+        "Verified update: region={}, zone={}",
+        row.get_string(1),
+        row.get_long(2)
+    );
+
+    println!("\n=== Deleting ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 1002);
+    row.set_field(1, "EMEA");
+    row.set_field(2, 2i64);
+    upsert_writer.delete(&row).await?;
+    println!("Deleted: {row:?}");
+
+    let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
+    if result.get_single_row()?.is_none() {
+        println!("Verified deletion");
+    }
+
+    Ok(())
+}
+
+async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, 
region: &str, zone: i64) {
+    let mut partition_values = HashMap::new();
+    partition_values.insert("region".to_string(), region.to_string());
+    partition_values.insert("zone".to_string(), zone.to_string());
+    let partition_spec = PartitionSpec::new(partition_values);
+
+    admin
+        .create_partition(table_path, &partition_spec, true)
+        .await
+        .unwrap();
+}
+
+fn make_key(id: i32, region: &str, zone: i64) -> GenericRow<'static> {
+    let mut row = GenericRow::new(4);
+    row.set_field(0, id);
+    row.set_field(1, region.to_string());
+    row.set_field(2, zone);
+    row
+}
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index ca6b942..92055a7 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 mod example_kv_table;
+mod example_partitioned_kv_table;
 
 use clap::Parser;
 use fluss::client::FlussConnection;
diff --git a/crates/fluss/src/bucketing/mod.rs 
b/crates/fluss/src/bucketing/mod.rs
index 2611ac7..1b43d12 100644
--- a/crates/fluss/src/bucketing/mod.rs
+++ b/crates/fluss/src/bucketing/mod.rs
@@ -24,7 +24,6 @@ pub trait BucketingFunction: Sync + Send {
     fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32>;
 }
 
-#[allow(dead_code)]
 impl dyn BucketingFunction {
     /// Provides the bucketing function for a given [DataLakeFormat]
     ///
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 286c46c..ea1efc3 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -32,7 +32,7 @@ use crate::rpc::{RpcClient, ServerConnection};
 use crate::error::{Error, Result};
 use crate::proto::GetTableInfoResponse;
 use crate::{BucketId, PartitionId, TableId};
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::slice::from_ref;
 use std::sync::Arc;
 use tokio::task::JoinHandle;
@@ -111,6 +111,12 @@ impl FlussAdmin {
             .admin_gateway
             .request(GetTableRequest::new(table_path))
             .await?;
+
+        // force update to avoid stale data in cache
+        self.metadata
+            .update_tables_metadata(&HashSet::from([table_path]), 
&HashSet::new(), vec![])
+            .await?;
+
         let GetTableInfoResponse {
             table_id,
             schema_id,
diff --git a/crates/fluss/src/client/connection.rs 
b/crates/fluss/src/client/connection.rs
index a19dbd2..e021011 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -93,11 +93,6 @@ impl FlussConnection {
     pub async fn get_table(&self, table_path: &TablePath) -> 
Result<FlussTable<'_>> {
         self.metadata.update_table_metadata(table_path).await?;
         let table_info = 
self.metadata.get_cluster().get_table(table_path)?.clone();
-        if table_info.is_partitioned() {
-            return Err(crate::error::Error::UnsupportedOperation {
-                message: "Partitioned tables are not supported".to_string(),
-            });
-        }
         Ok(FlussTable::new(self, self.metadata.clone(), table_info))
     }
 }
diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index 614f6e7..52ccd62 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -58,7 +58,14 @@ impl Metadata {
             ServerType::CoordinatorServer,
         );
         let con = connections.get_connection(&server_node).await?;
-        let response = con.request(UpdateMetadataRequest::new(&[])).await?;
+
+        let response = con
+            .request(UpdateMetadataRequest::new(
+                &HashSet::default(),
+                &HashSet::new(),
+                vec![],
+            ))
+            .await?;
         Cluster::from_metadata_response(response, None)
     }
 
@@ -95,7 +102,12 @@ impl Metadata {
         Ok(())
     }
 
-    pub async fn update_tables_metadata(&self, table_paths: 
&HashSet<&TablePath>) -> Result<()> {
+    pub async fn update_tables_metadata(
+        &self,
+        table_paths: &HashSet<&TablePath>,
+        physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
+        partition_ids: Vec<i64>,
+    ) -> Result<()> {
         let maybe_server = {
             let guard = self.cluster.read();
             guard.get_one_available_server().cloned()
@@ -114,16 +126,19 @@ impl Metadata {
 
         let conn = self.connections.get_connection(&server).await?;
 
-        let update_table_paths: Vec<&TablePath> = 
table_paths.iter().copied().collect();
         let response = conn
-            .request(UpdateMetadataRequest::new(update_table_paths.as_slice()))
+            .request(UpdateMetadataRequest::new(
+                table_paths,
+                physical_table_paths,
+                partition_ids,
+            ))
             .await?;
         self.update(response).await?;
         Ok(())
     }
 
     pub async fn update_table_metadata(&self, table_path: &TablePath) -> 
Result<()> {
-        self.update_tables_metadata(&HashSet::from([table_path]))
+        self.update_tables_metadata(&HashSet::from([table_path]), 
&HashSet::new(), vec![])
             .await
     }
 
@@ -133,8 +148,9 @@ impl Metadata {
             .iter()
             .filter(|table_path| 
cluster_binding.opt_get_table(table_path).is_none())
             .collect();
+
         if !need_update_table_paths.is_empty() {
-            self.update_tables_metadata(&need_update_table_paths)
+            self.update_tables_metadata(&need_update_table_paths, 
&HashSet::new(), vec![])
                 .await?;
         }
         Ok(())
@@ -150,7 +166,48 @@ impl Metadata {
         guard.clone()
     }
 
-    pub fn leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> 
{
+    const MAX_RETRY_TIMES: u8 = 3;
+
+    pub async fn leader_for(
+        &self,
+        table_path: &TablePath,
+        table_bucket: &TableBucket,
+    ) -> Result<Option<ServerNode>> {
+        let leader = self.get_leader_for(table_bucket);
+
+        if leader.is_some() {
+            Ok(leader)
+        } else {
+            for _ in 0..Self::MAX_RETRY_TIMES {
+                if let Some(partition_id) = table_bucket.partition_id() {
+                    self.update_tables_metadata(
+                        &HashSet::from([table_path]),
+                        &HashSet::new(),
+                        vec![partition_id],
+                    )
+                    .await?;
+                } else {
+                    self.update_tables_metadata(
+                        &HashSet::from([table_path]),
+                        &HashSet::new(),
+                        vec![],
+                    )
+                    .await?;
+                }
+
+                let cluster = self.cluster.read();
+                let leader = cluster.leader_for(table_bucket);
+
+                if leader.is_some() {
+                    return Ok(leader.cloned());
+                }
+            }
+
+            Ok(None)
+        }
+    }
+
+    fn get_leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> 
{
         let cluster = self.cluster.read();
         cluster.leader_for(table_bucket).cloned()
     }
@@ -173,14 +230,16 @@ mod tests {
     use crate::metadata::{TableBucket, TablePath};
     use crate::test_utils::build_cluster_arc;
 
-    #[test]
-    fn leader_for_returns_server() {
+    #[tokio::test]
+    async fn leader_for_returns_server() {
         let table_path = TablePath::new("db".to_string(), "tbl".to_string());
         let cluster = build_cluster_arc(&table_path, 1, 1);
         let metadata = Metadata::new_for_test(cluster);
         let leader = metadata
-            .leader_for(&TableBucket::new(1, 0))
-            .expect("leader");
+            .leader_for(&table_path, &TableBucket::new(1, 0))
+            .await
+            .expect("leader request should be Ok")
+            .expect("leader should exist");
         assert_eq!(leader.id(), 1);
     }
 
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index 6d76f28..7fe2023 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -17,7 +17,7 @@
 
 use crate::client::{WriteRecord, WriterClient};
 use crate::error::Result;
-use crate::metadata::{TableInfo, TablePath};
+use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
 use crate::row::GenericRow;
 use arrow::array::RecordBatch;
 use std::sync::Arc;
@@ -25,14 +25,14 @@ use std::sync::Arc;
 #[allow(dead_code)]
 pub struct TableAppend {
     table_path: TablePath,
-    table_info: TableInfo,
+    table_info: Arc<TableInfo>,
     writer_client: Arc<WriterClient>,
 }
 
 impl TableAppend {
     pub(super) fn new(
         table_path: TablePath,
-        table_info: TableInfo,
+        table_info: Arc<TableInfo>,
         writer_client: Arc<WriterClient>,
     ) -> Self {
         Self {
@@ -44,23 +44,27 @@ impl TableAppend {
 
     pub fn create_writer(&self) -> AppendWriter {
         AppendWriter {
-            table_path: Arc::new(self.table_path.clone()),
+            physical_table_path: 
Arc::new(PhysicalTablePath::of(Arc::new(self.table_path.clone()))),
             writer_client: self.writer_client.clone(),
-            table_info: Arc::new(self.table_info.clone()),
+            table_info: Arc::clone(&self.table_info),
         }
     }
 }
 
 pub struct AppendWriter {
-    table_path: Arc<TablePath>,
+    physical_table_path: Arc<PhysicalTablePath>,
     writer_client: Arc<WriterClient>,
     table_info: Arc<TableInfo>,
 }
 
 impl AppendWriter {
     pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
-        let record =
-            WriteRecord::for_append(self.table_path.clone(), 
self.table_info.schema_id, row);
+        let record = WriteRecord::for_append(
+            Arc::clone(&self.table_info),
+            Arc::clone(&self.physical_table_path),
+            self.table_info.schema_id,
+            row,
+        );
         let result_handle = self.writer_client.send(&record).await?;
         let result = result_handle.wait().await?;
         result_handle.result(result)
@@ -68,7 +72,8 @@ impl AppendWriter {
 
     pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
         let record = WriteRecord::for_append_record_batch(
-            self.table_path.clone(),
+            Arc::clone(&self.table_info),
+            Arc::clone(&self.physical_table_path),
             self.table_info.schema_id,
             batch,
         );
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 7ece34b..78ee065 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -833,9 +833,10 @@ mod tests {
     use crate::compression::{
         ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
     };
-    use crate::metadata::{DataField, DataTypes, RowType, TablePath};
+    use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, 
TablePath};
     use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, 
to_arrow_schema};
     use crate::row::GenericRow;
+    use crate::test_utils::build_table_info;
     use std::sync::Arc;
 
     fn test_read_context() -> Result<ReadContext> {
@@ -899,7 +900,9 @@ mod tests {
             DataField::new("id".to_string(), DataTypes::int(), None),
             DataField::new("name".to_string(), DataTypes::string(), None),
         ]);
-        let table_path = Arc::new(TablePath::new("db".to_string(), 
"tbl".to_string()));
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+        let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
 
         let mut builder = MemoryLogRecordsArrowBuilder::new(
             1,
@@ -914,7 +917,7 @@ mod tests {
         let mut row = GenericRow::new(2);
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
-        let record = WriteRecord::for_append(table_path, 1, row);
+        let record = WriteRecord::for_append(table_info, physical_table_path, 
1, row);
         builder.append(&record)?;
 
         let data = builder.build()?;
diff --git a/crates/fluss/src/client/table/lookup.rs 
b/crates/fluss/src/client/table/lookup.rs
index 4e89176..69cb91e 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -18,8 +18,9 @@
 use crate::bucketing::BucketingFunction;
 use crate::client::connection::FlussConnection;
 use crate::client::metadata::Metadata;
+use crate::client::table::partition_getter::PartitionGetter;
 use crate::error::{Error, Result};
-use crate::metadata::{RowType, TableBucket, TableInfo};
+use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, 
TablePath};
 use crate::record::kv::SCHEMA_ID_LENGTH;
 use crate::row::InternalRow;
 use crate::row::compacted::CompactedRow;
@@ -133,20 +134,43 @@ impl<'a> TableLookup<'a> {
         let data_lake_format = 
self.table_info.get_table_config().get_datalake_format()?;
         let bucketing_function = <dyn 
BucketingFunction>::of(data_lake_format.as_ref());
 
-        // Create key encoder for the primary key fields
-        let pk_fields = self.table_info.get_physical_primary_keys().to_vec();
-        let key_encoder = KeyEncoderFactory::of(
-            self.table_info.row_type(),
-            pk_fields.as_slice(),
-            &data_lake_format,
-        )?;
+        let row_type = self.table_info.row_type();
+        let primary_keys = self.table_info.get_primary_keys();
+        let lookup_row_type = row_type.project_with_field_names(primary_keys)?;
+
+        let physical_primary_keys = 
self.table_info.get_physical_primary_keys().to_vec();
+        let primary_key_encoder =
+            KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, 
&data_lake_format)?;
+
+        let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
+            None
+        } else {
+            let bucket_keys = self.table_info.get_bucket_keys().to_vec();
+            Some(KeyEncoderFactory::of(
+                &lookup_row_type,
+                &bucket_keys,
+                &data_lake_format,
+            )?)
+        };
+
+        let partition_getter = if self.table_info.is_partitioned() {
+            Some(PartitionGetter::new(
+                &lookup_row_type,
+                Arc::clone(self.table_info.get_partition_keys()),
+            )?)
+        } else {
+            None
+        };
 
         Ok(Lookuper {
             conn: self.conn,
+            table_path: Arc::new(self.table_info.table_path.clone()),
             table_info: self.table_info,
             metadata: self.metadata,
             bucketing_function,
-            key_encoder,
+            primary_key_encoder,
+            bucket_key_encoder,
+            partition_getter,
             num_buckets,
         })
     }
@@ -163,13 +187,15 @@ impl<'a> TableLookup<'a> {
 /// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
 /// let result = lookuper.lookup(&row).await?;
 /// ```
-// TODO: Support partitioned tables (extract partition from key)
 pub struct Lookuper<'a> {
     conn: &'a FlussConnection,
     table_info: TableInfo,
+    table_path: Arc<TablePath>,
     metadata: Arc<Metadata>,
     bucketing_function: Box<dyn BucketingFunction>,
-    key_encoder: Box<dyn KeyEncoder>,
+    primary_key_encoder: Box<dyn KeyEncoder>,
+    bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
+    partition_getter: Option<PartitionGetter>,
     num_buckets: i32,
 }
 
@@ -187,26 +213,47 @@ impl<'a> Lookuper<'a> {
     /// * `Err(Error)` - If the lookup fails
     pub async fn lookup(&mut self, row: &dyn InternalRow) -> 
Result<LookupResult<'_>> {
         // todo: support batch lookup
-        // Encode the key from the row
-        let encoded_key = self.key_encoder.encode_key(row)?;
-        let key_bytes = encoded_key.to_vec();
+        let pk_bytes = self.primary_key_encoder.encode_key(row)?;
+        let pk_bytes_vec = pk_bytes.to_vec();
+        let bk_bytes = match &mut self.bucket_key_encoder {
+            Some(encoder) => &encoder.encode_key(row)?,
+            None => &pk_bytes,
+        };
+
+        let partition_id = if let Some(ref partition_getter) = 
self.partition_getter {
+            let partition_name = partition_getter.get_partition(row)?;
+            let physical_table_path = PhysicalTablePath::of_partitioned(
+                Arc::clone(&self.table_path),
+                Some(partition_name),
+            );
+            let cluster = self.metadata.get_cluster();
+            match cluster.get_partition_id(&physical_table_path) {
+                Some(id) => Some(id),
+                None => {
+                    // Partition doesn't exist, return empty result (like Java)
+                    return Ok(LookupResult::empty(self.table_info.row_type()));
+                }
+            }
+        } else {
+            None
+        };
 
-        // Compute bucket from encoded key
         let bucket_id = self
             .bucketing_function
-            .bucketing(&key_bytes, self.num_buckets)?;
+            .bucketing(bk_bytes, self.num_buckets)?;
 
         let table_id = self.table_info.get_table_id();
-        let table_bucket = TableBucket::new(table_id, bucket_id);
+        let table_bucket = TableBucket::new_with_partition(table_id, 
partition_id, bucket_id);
 
         // Find the leader for this bucket
         let cluster = self.metadata.get_cluster();
-        let leader =
-            cluster
-                .leader_for(&table_bucket)
-                .ok_or_else(|| Error::LeaderNotAvailable {
-                    message: format!("No leader found for table bucket: 
{table_bucket}"),
-                })?;
+        let leader = self
+            .metadata
+            .leader_for(self.table_path.as_ref(), &table_bucket)
+            .await?
+            .ok_or_else(|| Error::LeaderNotAvailable {
+                message: format!("No leader found for table bucket: 
{table_bucket}"),
+            })?;
 
         // Get connection to the tablet server
         let tablet_server =
@@ -223,7 +270,7 @@ impl<'a> Lookuper<'a> {
         let connection = connections.get_connection(tablet_server).await?;
 
         // Send lookup request
-        let request = LookupRequest::new(table_id, None, bucket_id, 
vec![key_bytes]);
+        let request = LookupRequest::new(table_id, partition_id, bucket_id, 
vec![pk_bytes_vec]);
         let response = connection.request(request).await?;
 
         // Extract the values from response
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 2dc56d5..2fbbbc9 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -69,7 +69,7 @@ impl<'a> FlussTable<'a> {
     pub fn new_append(&self) -> Result<TableAppend> {
         Ok(TableAppend::new(
             self.table_path.clone(),
-            self.table_info.clone(),
+            Arc::new(self.table_info.clone()),
             self.conn.get_or_create_writer_client()?,
         ))
     }
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 356ba1c..14d2841 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -630,7 +630,7 @@ impl LogFetcher {
         if self.is_partitioned {
             // Fallback to full table metadata refresh until partition-aware 
updates are available.
             self.metadata
-                .update_tables_metadata(&HashSet::from([&self.table_path]))
+                .update_tables_metadata(&HashSet::from([&self.table_path]), 
&HashSet::new(), vec![])
                 .await
                 .or_else(|e| {
                     if let Error::RpcError { source, .. } = &e
@@ -649,7 +649,7 @@ impl LogFetcher {
 
         // TODO: Handle PartitionNotExist error
         self.metadata
-            .update_tables_metadata(&HashSet::from([&self.table_path]))
+            .update_tables_metadata(&HashSet::from([&self.table_path]), 
&HashSet::new(), vec![])
             .await
             .or_else(|e| {
                 if let Error::RpcError { source, .. } = &e
@@ -799,8 +799,9 @@ impl LogFetcher {
                         let table_id = table_bucket.table_id();
                         let cluster = metadata.get_cluster();
                         if let Some(table_path) = 
cluster.get_table_path_by_id(table_id) {
-                            let physical_tables =
-                                
HashSet::from([PhysicalTablePath::of(table_path.clone())]);
+                            let physical_tables = 
HashSet::from([PhysicalTablePath::of(Arc::new(
+                                table_path.clone(),
+                            ))]);
                             
metadata.invalidate_physical_table_meta(&physical_tables);
                         } else {
                             warn!(
@@ -1498,7 +1499,7 @@ mod tests {
     use crate::compression::{
         ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
     };
-    use crate::metadata::{TableInfo, TablePath};
+    use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
     use crate::record::MemoryLogRecordsArrowBuilder;
     use crate::row::{Datum, GenericRow};
     use crate::rpc::FlussError;
@@ -1514,8 +1515,10 @@ mod tests {
                 compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
             },
         )?;
+        let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
         let record = WriteRecord::for_append(
-            table_path,
+            Arc::new(table_info.clone()),
+            physical_table_path,
             1,
             GenericRow {
                 values: vec![Datum::Int32(1)],
@@ -1684,7 +1687,7 @@ mod tests {
         )?;
 
         let bucket = TableBucket::new(1, 0);
-        assert!(metadata.leader_for(&bucket).is_some());
+        assert!(metadata.leader_for(&table_path, &bucket).await?.is_some());
 
         let response = crate::proto::FetchLogResponse {
             tables_resp: vec![crate::proto::PbFetchLogRespForTable {
@@ -1713,7 +1716,7 @@ mod tests {
 
         LogFetcher::handle_fetch_response(response, response_context).await;
 
-        assert!(metadata.leader_for(&bucket).is_none());
+        assert!(metadata.get_cluster().leader_for(&bucket).is_none());
         Ok(())
     }
 }
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index 984592d..269d525 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -19,12 +19,13 @@ use crate::client::table::writer::{DeleteResult, 
TableWriter, UpsertResult, Upse
 use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::{KvFormat, RowType, TableInfo, TablePath};
+use crate::metadata::{PhysicalTablePath, RowType, TableInfo, TablePath};
 use crate::row::InternalRow;
 use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder, 
RowEncoderFactory};
 use crate::row::field_getter::FieldGetter;
 use std::sync::Arc;
 
+use crate::client::table::partition_getter::PartitionGetter;
 use bitvec::prelude::bitvec;
 use bytes::Bytes;
 
@@ -107,30 +108,25 @@ impl TableUpsert {
     }
 }
 
-#[allow(dead_code)]
 struct UpsertWriterImpl<RE>
 where
     RE: RowEncoder,
 {
     table_path: Arc<TablePath>,
     writer_client: Arc<WriterClient>,
-    // TODO: Partitioning
-    // partition_field_getter: Option<Box<dyn KeyEncoder>>,
+    partition_field_getter: Option<PartitionGetter>,
     primary_key_encoder: Box<dyn KeyEncoder>,
     target_columns: Option<Arc<Vec<usize>>>,
     // Use primary key encoder as bucket key encoder when None
     bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
-    kv_format: KvFormat,
     write_format: WriteFormat,
     row_encoder: RE,
     field_getters: Box<[FieldGetter]>,
     table_info: Arc<TableInfo>,
 }
 
-#[allow(dead_code)]
 struct UpsertWriterFactory;
 
-#[allow(dead_code)]
 impl UpsertWriterFactory {
     pub fn create(
         table_path: Arc<TablePath>,
@@ -168,13 +164,22 @@ impl UpsertWriterFactory {
 
         let field_getters = FieldGetter::create_field_getters(row_type);
 
+        let partition_field_getter = if table_info.is_partitioned() {
+            Some(PartitionGetter::new(
+                row_type,
+                Arc::clone(table_info.get_partition_keys()),
+            )?)
+        } else {
+            None
+        };
+
         Ok(UpsertWriterImpl {
             table_path,
+            partition_field_getter,
             writer_client,
             primary_key_encoder,
             target_columns: partial_update_columns,
             bucket_key_encoder,
-            kv_format: kv_format.clone(),
             write_format,
             row_encoder: RowEncoderFactory::create(kv_format, 
row_type.clone())?,
             field_getters,
@@ -311,6 +316,18 @@ impl<RE: RowEncoder> UpsertWriterImpl<RE> {
         }
         self.row_encoder.finish_row()
     }
+
+    fn get_physical_path<R: InternalRow>(&self, row: &R) -> 
Result<PhysicalTablePath> {
+        if let Some(partition_getter) = &self.partition_field_getter {
+            let partition = partition_getter.get_partition(row);
+            Ok(PhysicalTablePath::of_partitioned(
+                Arc::clone(&self.table_path),
+                Some(partition?),
+            ))
+        } else {
+            Ok(PhysicalTablePath::of(Arc::clone(&self.table_path)))
+        }
+    }
 }
 
 impl<RE: RowEncoder> TableWriter for UpsertWriterImpl<RE> {
@@ -343,7 +360,8 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
         };
 
         let write_record = WriteRecord::for_upsert(
-            Arc::clone(&self.table_path),
+            Arc::clone(&self.table_info),
+            Arc::new(self.get_physical_path(row)?),
             self.table_info.schema_id,
             key,
             bucket_key,
@@ -372,7 +390,8 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
         let (key, bucket_key) = self.get_keys(row)?;
 
         let write_record = WriteRecord::for_upsert(
-            Arc::clone(&self.table_path),
+            Arc::clone(&self.table_info),
+            Arc::new(self.get_physical_path(row)?),
             self.table_info.schema_id,
             key,
             bucket_key,
diff --git a/crates/fluss/src/client/table/writer.rs 
b/crates/fluss/src/client/table/writer.rs
index 8276545..ec26ec6 100644
--- a/crates/fluss/src/client/table/writer.rs
+++ b/crates/fluss/src/client/table/writer.rs
@@ -15,12 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::{WriteRecord, WriterClient};
 use crate::row::{GenericRow, InternalRow};
-use std::sync::Arc;
 
 use crate::error::Result;
-use crate::metadata::{TableInfo, TablePath};
 
 #[allow(dead_code, async_fn_in_trait)]
 pub trait TableWriter {
@@ -47,55 +44,3 @@ pub struct UpsertResult;
 /// Currently this is an empty struct to allow for compatible evolution in the 
future
 #[derive(Default)]
 pub struct DeleteResult;
-
-#[allow(dead_code)]
-pub struct AbstractTableWriter {
-    table_path: Arc<TablePath>,
-    writer_client: Arc<WriterClient>,
-    field_count: i32,
-    schema_id: i32,
-}
-
-#[allow(dead_code)]
-impl AbstractTableWriter {
-    pub fn new(
-        table_path: TablePath,
-        table_info: &TableInfo,
-        writer_client: Arc<WriterClient>,
-    ) -> Self {
-        // todo: partition
-        Self {
-            table_path: Arc::new(table_path),
-            writer_client,
-            field_count: table_info.row_type().fields().len() as i32,
-            schema_id: table_info.schema_id,
-        }
-    }
-
-    pub async fn send(&self, write_record: &WriteRecord<'_>) -> Result<()> {
-        let result_handle = self.writer_client.send(write_record).await?;
-        let result = result_handle.wait().await?;
-        result_handle.result(result)
-    }
-}
-
-impl TableWriter for AbstractTableWriter {
-    async fn flush(&self) -> Result<()> {
-        todo!()
-    }
-}
-
-// Append writer implementation
-#[allow(dead_code)]
-pub struct AppendWriterImpl {
-    base: AbstractTableWriter,
-}
-
-#[allow(dead_code)]
-impl AppendWriterImpl {
-    pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
-        let record =
-            WriteRecord::for_append(self.base.table_path.clone(), 
self.base.schema_id, row);
-        self.base.send(&record).await
-    }
-}
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index a5b9832..2a45517 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -21,7 +21,7 @@ use crate::client::{LogWriteRecord, Record, ResultHandle, 
WriteRecord};
 use crate::cluster::{BucketLocation, Cluster, ServerNode};
 use crate::config::Config;
 use crate::error::Result;
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket};
 use crate::util::current_time_ms;
 use crate::{BucketId, PartitionId, TableId};
 use dashmap::DashMap;
@@ -37,7 +37,7 @@ type BucketBatches = Vec<(BucketId, 
Arc<Mutex<VecDeque<WriteBatch>>>)>;
 #[allow(dead_code)]
 pub struct RecordAccumulator {
     config: Config,
-    write_batches: DashMap<TablePath, BucketAndWriteBatches>,
+    write_batches: DashMap<Arc<PhysicalTablePath>, BucketAndWriteBatches>,
     // batch_id -> complete callback
     incomplete_batches: RwLock<HashMap<i64, ResultHandle>>,
     batch_timeout_ms: i64,
@@ -88,14 +88,14 @@ impl RecordAccumulator {
         &self,
         cluster: &Cluster,
         record: &WriteRecord,
-        bucket_id: BucketId,
         dq: &mut VecDeque<WriteBatch>,
     ) -> Result<RecordAppendResult> {
         if let Some(append_result) = self.try_append(record, dq)? {
             return Ok(append_result);
         }
 
-        let table_path = &record.table_path;
+        let physical_table_path = &record.physical_table_path;
+        let table_path = physical_table_path.get_table_path();
         let table_info = cluster.get_table(table_path)?;
         let arrow_compression_info = 
table_info.get_table_config().get_arrow_compression_info()?;
         let row_type = &table_info.row_type;
@@ -105,22 +105,20 @@ impl RecordAccumulator {
         let mut batch: WriteBatch = match record.record() {
             Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new(
                 self.batch_id.fetch_add(1, Ordering::Relaxed),
-                table_path.as_ref().clone(),
+                Arc::clone(physical_table_path),
                 schema_id,
                 arrow_compression_info,
                 row_type,
-                bucket_id,
                 current_time_ms(),
                 matches!(&record.record, 
Record::Log(LogWriteRecord::RecordBatch(_))),
             )?),
             Record::Kv(kv_record) => Kv(KvWriteBatch::new(
                 self.batch_id.fetch_add(1, Ordering::Relaxed),
-                table_path.as_ref().clone(),
+                Arc::clone(physical_table_path),
                 schema_id,
                 // TODO: Decide how to derive write limit in the absence of 
java's equivalent of PreAllocatedPagedOutputView
                 KvWriteBatch::DEFAULT_WRITE_LIMIT,
                 record.write_format.to_kv_format()?,
-                bucket_id,
                 kv_record.target_columns.clone(),
                 current_time_ms(),
             )),
@@ -153,18 +151,25 @@ impl RecordAccumulator {
         cluster: &Cluster,
         abort_if_batch_full: bool,
     ) -> Result<RecordAppendResult> {
-        let table_path = &record.table_path;
+        let physical_table_path = &record.physical_table_path;
+        let table_path = physical_table_path.get_table_path();
+        let table_info = cluster.get_table(table_path)?;
+        let is_partitioned_table = table_info.is_partitioned();
 
-        // TODO: Implement partitioning
+        let partition_id = if is_partitioned_table {
+            cluster.get_partition_id(physical_table_path)
+        } else {
+            None
+        };
 
         let dq = {
             let mut binding = self
                 .write_batches
-                .entry(table_path.as_ref().clone())
+                .entry(Arc::clone(physical_table_path))
                 .or_insert_with(|| BucketAndWriteBatches {
-                    table_id: 0,
-                    is_partitioned_table: false,
-                    partition_id: None,
+                    table_id: table_info.table_id,
+                    is_partitioned_table,
+                    partition_id,
                     batches: Default::default(),
                 });
             let bucket_and_batches = binding.value_mut();
@@ -185,23 +190,24 @@ impl RecordAccumulator {
                 true, false, true,
             ));
         }
-        self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
+        self.append_new_batch(cluster, record, &mut dq_guard)
     }
 
     pub async fn ready(&self, cluster: &Arc<Cluster>) -> 
Result<ReadyCheckResult> {
         // Snapshot just the Arcs we need, avoiding cloning the entire 
BucketAndWriteBatches struct
-        let entries: Vec<(TablePath, BucketBatches)> = self
+        let entries: Vec<(Arc<PhysicalTablePath>, Option<PartitionId>, 
BucketBatches)> = self
             .write_batches
             .iter()
             .map(|entry| {
-                let table_path = entry.key().clone();
+                let physical_table_path = Arc::clone(entry.key());
+                let partition_id = entry.value().partition_id;
                 let bucket_batches: Vec<_> = entry
                     .value()
                     .batches
                     .iter()
                     .map(|(bucket_id, batch_arc)| (*bucket_id, 
batch_arc.clone()))
                     .collect();
-                (table_path, bucket_batches)
+                (physical_table_path, partition_id, bucket_batches)
             })
             .collect();
 
@@ -209,10 +215,12 @@ impl RecordAccumulator {
         let mut next_ready_check_delay_ms = self.batch_timeout_ms;
         let mut unknown_leader_tables = HashSet::new();
 
-        for (table_path, bucket_batches) in entries {
+        for (physical_table_path, mut partition_id, bucket_batches) in entries 
{
             next_ready_check_delay_ms = self
                 .bucket_ready(
-                    &table_path,
+                    &physical_table_path,
+                    physical_table_path.get_partition_name().is_some(),
+                    &mut partition_id,
                     bucket_batches,
                     &mut ready_nodes,
                     &mut unknown_leader_tables,
@@ -229,17 +237,42 @@ impl RecordAccumulator {
         })
     }
 
+    #[allow(clippy::too_many_arguments)]
     async fn bucket_ready(
         &self,
-        table_path: &TablePath,
+        physical_table_path: &Arc<PhysicalTablePath>,
+        is_partitioned_table: bool,
+        partition_id: &mut Option<PartitionId>,
         bucket_batches: BucketBatches,
         ready_nodes: &mut HashSet<ServerNode>,
-        unknown_leader_tables: &mut HashSet<TablePath>,
+        unknown_leader_tables: &mut HashSet<Arc<PhysicalTablePath>>,
         cluster: &Cluster,
         next_ready_check_delay_ms: i64,
     ) -> Result<i64> {
         let mut next_delay = next_ready_check_delay_ms;
 
+        // First check this table has partitionId.
+        if is_partitioned_table && partition_id.is_none() {
+            let partition_id = cluster.get_partition_id(physical_table_path);
+
+            if partition_id.is_some() {
+                // Update the cached partition_id
+                if let Some(mut entry) = 
self.write_batches.get_mut(physical_table_path) {
+                    entry.partition_id = partition_id;
+                }
+            } else {
+                log::debug!(
+                    "Partition does not exist for {}, bucket will not be set 
to ready",
+                    physical_table_path.as_ref()
+                );
+
+                // TODO: we shouldn't add unready partitions to 
unknownLeaderTables,
+                // because it cases PartitionNotExistException later
+                unknown_leader_tables.insert(Arc::clone(physical_table_path));
+                return Ok(next_delay);
+            }
+        }
+
         for (bucket_id, batch) in bucket_batches {
             let batch_guard = batch.lock().await;
             if batch_guard.is_empty() {
@@ -250,12 +283,12 @@ impl RecordAccumulator {
             let waited_time_ms = batch.waited_time_ms(current_time_ms());
             let deque_size = batch_guard.len();
             let full = deque_size > 1 || batch.is_closed();
-            let table_bucket = cluster.get_table_bucket(table_path, 
bucket_id)?;
+            let table_bucket = cluster.get_table_bucket(physical_table_path, 
bucket_id)?;
             if let Some(leader) = cluster.leader_for(&table_bucket) {
                 next_delay =
                     self.batch_ready(leader, waited_time_ms, full, 
ready_nodes, next_delay);
             } else {
-                unknown_leader_tables.insert(table_path.clone());
+                unknown_leader_tables.insert(Arc::clone(physical_table_path));
             }
         }
         Ok(next_delay)
@@ -332,14 +365,14 @@ impl RecordAccumulator {
 
         loop {
             let bucket = &buckets[current_index];
-            let table_path = bucket.table_path.clone();
+            let table_path = bucket.physical_table_path();
             let table_bucket = bucket.table_bucket.clone();
             last_processed_index = current_index;
             current_index = (current_index + 1) % buckets.len();
 
             let deque = self
                 .write_batches
-                .get(&table_path)
+                .get(table_path)
                 .and_then(|bucket_and_write_batches| {
                     bucket_and_write_batches
                         .batches
@@ -399,20 +432,22 @@ impl RecordAccumulator {
 
     pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
         ready_write_batch.write_batch.re_enqueued();
-        let table_path = ready_write_batch.write_batch.table_path().clone();
+        let physical_table_path = 
ready_write_batch.write_batch.physical_table_path();
         let bucket_id = ready_write_batch.table_bucket.bucket_id();
         let table_id = ready_write_batch.table_bucket.table_id();
+        let partition_id = ready_write_batch.table_bucket.partition_id();
+        let is_partitioned_table = partition_id.is_some();
 
         let dq = {
-            let mut binding =
-                self.write_batches
-                    .entry(table_path)
-                    .or_insert_with(|| BucketAndWriteBatches {
-                        table_id,
-                        is_partitioned_table: false,
-                        partition_id: None,
-                        batches: Default::default(),
-                    });
+            let mut binding = self
+                .write_batches
+                .entry(Arc::clone(physical_table_path))
+                .or_insert_with(|| BucketAndWriteBatches {
+                    table_id,
+                    is_partitioned_table,
+                    partition_id,
+                    batches: Default::default(),
+                });
             let bucket_and_batches = binding.value_mut();
             bucket_and_batches
                 .batches
@@ -478,6 +513,12 @@ pub struct ReadyWriteBatch {
     pub write_batch: WriteBatch,
 }
 
+impl ReadyWriteBatch {
+    pub fn write_batch(&self) -> &WriteBatch {
+        &self.write_batch
+    }
+}
+
 #[allow(dead_code)]
 struct BucketAndWriteBatches {
     table_id: TableId,
@@ -525,14 +566,14 @@ impl RecordAppendResult {
 pub struct ReadyCheckResult {
     pub ready_nodes: HashSet<ServerNode>,
     pub next_ready_check_delay_ms: i64,
-    pub unknown_leader_tables: HashSet<TablePath>,
+    pub unknown_leader_tables: HashSet<Arc<PhysicalTablePath>>,
 }
 
 impl ReadyCheckResult {
     pub fn new(
         ready_nodes: HashSet<ServerNode>,
         next_ready_check_delay_ms: i64,
-        unknown_leader_tables: HashSet<TablePath>,
+        unknown_leader_tables: HashSet<Arc<PhysicalTablePath>>,
     ) -> Self {
         ReadyCheckResult {
             ready_nodes,
@@ -547,17 +588,20 @@ mod tests {
     use super::*;
     use crate::metadata::TablePath;
     use crate::row::{Datum, GenericRow};
-    use crate::test_utils::build_cluster;
+    use crate::test_utils::{build_cluster, build_table_info};
     use std::sync::Arc;
 
     #[tokio::test]
     async fn re_enqueue_increments_attempts() -> Result<()> {
         let config = Config::default();
         let accumulator = RecordAccumulator::new(config);
-        let table_path = Arc::new(TablePath::new("db".to_string(), 
"tbl".to_string()));
-        let cluster = Arc::new(build_cluster(table_path.as_ref(), 1, 1));
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
+        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+        let cluster = Arc::new(build_cluster(&table_path, 1, 1));
         let record = WriteRecord::for_append(
-            table_path.clone(),
+            table_info,
+            physical_table_path,
             1,
             GenericRow {
                 values: vec![Datum::Int32(1)],
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 41561d4..da30c8a 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::BucketId;
 use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
 use crate::client::{Record, ResultHandle, WriteRecord};
 use crate::compression::ArrowCompressionInfo;
 use crate::error::{Error, Result};
-use crate::metadata::{KvFormat, RowType, TablePath};
+use crate::metadata::{KvFormat, PhysicalTablePath, RowType};
 use crate::record::MemoryLogRecordsArrowBuilder;
 use crate::record::kv::KvRecordBatchBuilder;
 use bytes::Bytes;
@@ -28,12 +27,10 @@ use std::cmp::max;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 
-#[allow(dead_code)]
 pub struct InnerWriteBatch {
     batch_id: i64,
-    table_path: TablePath,
+    physical_table_path: Arc<PhysicalTablePath>,
     create_ms: i64,
-    bucket_id: BucketId,
     results: BroadcastOnce<BatchWriteResult>,
     completed: AtomicBool,
     attempts: AtomicI32,
@@ -41,12 +38,11 @@ pub struct InnerWriteBatch {
 }
 
 impl InnerWriteBatch {
-    fn new(batch_id: i64, table_path: TablePath, create_ms: i64, bucket_id: 
BucketId) -> Self {
+    fn new(batch_id: i64, physical_table_path: Arc<PhysicalTablePath>, 
create_ms: i64) -> Self {
         InnerWriteBatch {
             batch_id,
-            table_path,
+            physical_table_path,
             create_ms,
-            bucket_id,
             results: Default::default(),
             completed: AtomicBool::new(false),
             attempts: AtomicI32::new(0),
@@ -74,8 +70,8 @@ impl InnerWriteBatch {
         self.drained_ms = max(self.drained_ms, now_ms);
     }
 
-    fn table_path(&self) -> &TablePath {
-        &self.table_path
+    fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+        &self.physical_table_path
     }
 
     fn attempts(&self) -> i32 {
@@ -165,8 +161,8 @@ impl WriteBatch {
         self.inner_batch().batch_id
     }
 
-    pub fn table_path(&self) -> &TablePath {
-        self.inner_batch().table_path()
+    pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+        self.inner_batch().physical_table_path()
     }
 
     pub fn attempts(&self) -> i32 {
@@ -192,15 +188,14 @@ impl ArrowLogWriteBatch {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         batch_id: i64,
-        table_path: TablePath,
+        physical_table_path: Arc<PhysicalTablePath>,
         schema_id: i32,
         arrow_compression_info: ArrowCompressionInfo,
         row_type: &RowType,
-        bucket_id: BucketId,
         create_ms: i64,
         to_append_record_batch: bool,
     ) -> Result<Self> {
-        let base = InnerWriteBatch::new(batch_id, table_path, create_ms, 
bucket_id);
+        let base = InnerWriteBatch::new(batch_id, physical_table_path, 
create_ms);
         Ok(Self {
             write_batch: base,
             arrow_builder: MemoryLogRecordsArrowBuilder::new(
@@ -273,15 +268,14 @@ impl KvWriteBatch {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         batch_id: i64,
-        table_path: TablePath,
+        physical_table_path: Arc<PhysicalTablePath>,
         schema_id: i32,
         write_limit: usize,
         kv_format: KvFormat,
-        bucket_id: BucketId,
         target_columns: Option<Arc<Vec<usize>>>,
         create_ms: i64,
     ) -> Self {
-        let base = InnerWriteBatch::new(batch_id, table_path, create_ms, 
bucket_id);
+        let base = InnerWriteBatch::new(batch_id, physical_table_path, 
create_ms);
         Self {
             write_batch: base,
             kv_batch_builder: KvRecordBatchBuilder::new(schema_id, 
write_limit, kv_format),
@@ -367,19 +361,22 @@ mod tests {
     use super::*;
     use crate::client::{RowBytes, WriteFormat};
     use crate::metadata::TablePath;
+    use crate::test_utils::build_table_info;
 
     #[test]
     fn complete_only_once() {
-        let batch =
-            InnerWriteBatch::new(1, TablePath::new("db".to_string(), 
"tbl".to_string()), 0, 0);
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let physical_path = PhysicalTablePath::of(Arc::new(table_path));
+        let batch = InnerWriteBatch::new(1, Arc::new(physical_path), 0);
         assert!(batch.complete(Ok(())));
         
assert!(!batch.complete(Err(crate::client::broadcast::Error::Dropped)));
     }
 
     #[test]
     fn attempts_increment_on_reenqueue() {
-        let batch =
-            InnerWriteBatch::new(1, TablePath::new("db".to_string(), 
"tbl".to_string()), 0, 0);
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let physical_path = PhysicalTablePath::of(Arc::new(table_path));
+        let batch = InnerWriteBatch::new(1, Arc::new(physical_path), 0);
         assert_eq!(batch.attempts(), 0);
         batch.re_enqueued();
         assert_eq!(batch.attempts(), 1);
@@ -401,12 +398,14 @@ mod tests {
             DataField::new("name".to_string(), DataTypes::string(), None),
         ]);
         let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+        let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
 
         // Test 1: RowAppendRecordBatchBuilder (to_append_record_batch=false)
         {
             let mut batch = ArrowLogWriteBatch::new(
                 1,
-                table_path.clone(),
+                Arc::clone(&physical_table_path),
                 1,
                 ArrowCompressionInfo {
                     compression_type: ArrowCompressionType::None,
@@ -414,7 +413,6 @@ mod tests {
                 },
                 &row_type,
                 0,
-                0,
                 false,
             )
             .unwrap();
@@ -424,7 +422,12 @@ mod tests {
                 let mut row = GenericRow::new(2);
                 row.set_field(0, 1_i32);
                 row.set_field(1, "hello");
-                let record = 
WriteRecord::for_append(Arc::new(table_path.clone()), 1, row);
+                let record = WriteRecord::for_append(
+                    Arc::clone(&table_info),
+                    Arc::clone(&physical_table_path),
+                    1,
+                    row,
+                );
                 batch.try_append(&record).unwrap();
             }
 
@@ -446,7 +449,7 @@ mod tests {
         {
             let mut batch = ArrowLogWriteBatch::new(
                 1,
-                table_path.clone(),
+                physical_table_path.clone(),
                 1,
                 ArrowCompressionInfo {
                     compression_type: ArrowCompressionType::None,
@@ -454,7 +457,6 @@ mod tests {
                 },
                 &row_type,
                 0,
-                0,
                 true,
             )
             .unwrap();
@@ -472,8 +474,12 @@ mod tests {
             )
             .unwrap();
 
-            let record =
-                
WriteRecord::for_append_record_batch(Arc::new(table_path.clone()), 1, 
record_batch);
+            let record = WriteRecord::for_append_record_batch(
+                Arc::clone(&table_info),
+                Arc::clone(&physical_table_path),
+                1,
+                record_batch,
+            );
             batch.try_append(&record).unwrap();
 
             let estimated_size = batch.estimated_size_in_bytes();
@@ -496,21 +502,23 @@ mod tests {
         use crate::metadata::KvFormat;
 
         let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+        let physical_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
 
         let mut batch = KvWriteBatch::new(
             1,
-            table_path.clone(),
+            Arc::clone(&physical_path),
             1,
             KvWriteBatch::DEFAULT_WRITE_LIMIT,
             KvFormat::COMPACTED,
-            0,
             None,
             0,
         );
 
         for _ in 0..200 {
             let record = WriteRecord::for_upsert(
-                Arc::new(table_path.clone()),
+                Arc::clone(&table_info),
+                Arc::clone(&physical_path),
                 1,
                 Bytes::from(vec![1_u8, 2_u8, 3_u8]),
                 None,
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs 
b/crates/fluss/src/client/write/bucket_assigner.rs
index 817101a..7fcd20b 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -19,9 +19,10 @@ use crate::bucketing::BucketingFunction;
 use crate::cluster::Cluster;
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::TablePath;
+use crate::metadata::PhysicalTablePath;
 use bytes::Bytes;
 use rand::Rng;
+use std::sync::Arc;
 use std::sync::atomic::{AtomicI32, Ordering};
 
 pub trait BucketAssigner: Sync + Send {
@@ -34,12 +35,12 @@ pub trait BucketAssigner: Sync + Send {
 
 #[derive(Debug)]
 pub struct StickyBucketAssigner {
-    table_path: TablePath,
+    table_path: Arc<PhysicalTablePath>,
     current_bucket_id: AtomicI32,
 }
 
 impl StickyBucketAssigner {
-    pub fn new(table_path: TablePath) -> Self {
+    pub fn new(table_path: Arc<PhysicalTablePath>) -> Self {
         Self {
             table_path,
             current_bucket_id: AtomicI32::new(-1),
@@ -55,7 +56,7 @@ impl StickyBucketAssigner {
                 let mut rng = rand::rng();
                 let mut random: i32 = rng.random();
                 random &= i32::MAX;
-                new_bucket = random % 
cluster.get_bucket_count(&self.table_path);
+                new_bucket = random % 
cluster.get_bucket_count(self.table_path.get_table_path());
             } else if available_buckets.len() == 1 {
                 new_bucket = available_buckets[0].table_bucket.bucket_id();
             } else {
@@ -155,12 +156,15 @@ mod tests {
     use crate::cluster::Cluster;
     use crate::metadata::TablePath;
     use crate::test_utils::build_cluster;
+    use std::sync::Arc;
 
     #[test]
     fn sticky_bucket_assigner_picks_available_bucket() {
         let table_path = TablePath::new("db".to_string(), "tbl".to_string());
         let cluster = build_cluster(&table_path, 1, 2);
-        let assigner = StickyBucketAssigner::new(table_path);
+        let assigner = 
StickyBucketAssigner::new(Arc::new(PhysicalTablePath::of(Arc::new(
+            table_path.clone(),
+        ))));
         let bucket = assigner.assign_bucket(None, &cluster).expect("bucket");
         assert!((0..2).contains(&bucket));
 
@@ -174,7 +178,7 @@ mod tests {
         let assigner = HashBucketAssigner::new(3, <dyn 
BucketingFunction>::of(None));
         let cluster = Cluster::default();
         let err = assigner.assign_bucket(None, &cluster).unwrap_err();
-        assert!(matches!(err, crate::error::Error::IllegalArgument { .. }));
+        assert!(matches!(err, IllegalArgument { .. }));
     }
 
     #[test]
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index dcc6795..868b582 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -20,7 +20,8 @@ mod batch;
 
 use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, 
BroadcastOnceReceiver};
 use crate::error::Error;
-use crate::metadata::TablePath;
+use crate::metadata::{PhysicalTablePath, TableInfo};
+
 use crate::row::GenericRow;
 pub use accumulator::*;
 use arrow::array::RecordBatch;
@@ -40,16 +41,21 @@ pub use writer_client::WriterClient;
 #[allow(dead_code)]
 pub struct WriteRecord<'a> {
     record: Record<'a>,
-    table_path: Arc<TablePath>,
+    physical_table_path: Arc<PhysicalTablePath>,
     bucket_key: Option<Bytes>,
     schema_id: i32,
     write_format: WriteFormat,
+    table_info: Arc<TableInfo>,
 }
 
 impl<'a> WriteRecord<'a> {
     pub fn record(&self) -> &Record<'a> {
         &self.record
     }
+
+    pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+        &self.physical_table_path
+    }
 }
 
 pub enum Record<'a> {
@@ -102,10 +108,16 @@ impl<'a> KvWriteRecord<'a> {
 }
 
 impl<'a> WriteRecord<'a> {
-    pub fn for_append(table_path: Arc<TablePath>, schema_id: i32, row: 
GenericRow<'a>) -> Self {
+    pub fn for_append(
+        table_info: Arc<TableInfo>,
+        physical_table_path: Arc<PhysicalTablePath>,
+        schema_id: i32,
+        row: GenericRow<'a>,
+    ) -> Self {
         Self {
+            table_info,
             record: Record::Log(LogWriteRecord::Generic(row)),
-            table_path,
+            physical_table_path,
             bucket_key: None,
             schema_id,
             write_format: WriteFormat::ArrowLog,
@@ -113,21 +125,25 @@ impl<'a> WriteRecord<'a> {
     }
 
     pub fn for_append_record_batch(
-        table_path: Arc<TablePath>,
+        table_info: Arc<TableInfo>,
+        physical_table_path: Arc<PhysicalTablePath>,
         schema_id: i32,
         row: RecordBatch,
     ) -> Self {
         Self {
+            table_info,
             record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))),
-            table_path,
+            physical_table_path,
             bucket_key: None,
             schema_id,
             write_format: WriteFormat::ArrowLog,
         }
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub fn for_upsert(
-        table_path: Arc<TablePath>,
+        table_info: Arc<TableInfo>,
+        physical_table_path: Arc<PhysicalTablePath>,
         schema_id: i32,
         key: Bytes,
         bucket_key: Option<Bytes>,
@@ -136,8 +152,9 @@ impl<'a> WriteRecord<'a> {
         row_bytes: Option<RowBytes<'a>>,
     ) -> Self {
         Self {
+            table_info,
             record: Record::Kv(KvWriteRecord::new(key, target_columns, 
row_bytes)),
-            table_path,
+            physical_table_path,
             bucket_key,
             schema_id,
             write_format,
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index 905ef80..6a7dad0 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -15,20 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::TableId;
 use crate::client::broadcast;
 use crate::client::metadata::Metadata;
 use crate::client::write::batch::WriteBatch;
 use crate::client::{ReadyWriteBatch, RecordAccumulator};
 use crate::error::Error::UnexpectedError;
 use crate::error::{FlussError, Result};
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
 use crate::proto::{
     PbProduceLogRespForBucket, PbPutKvRespForBucket, ProduceLogResponse, 
PutKvResponse,
 };
 use crate::rpc::ServerConnection;
 use crate::rpc::message::{ProduceLogRequest, PutKvRequest};
-use log::warn;
+use crate::{PartitionId, TableId};
+use log::{debug, warn};
 use parking_lot::Mutex;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
@@ -82,9 +82,39 @@ impl Sender {
 
         // Update metadata if needed
         if !ready_check_result.unknown_leader_tables.is_empty() {
-            self.metadata
-                
.update_tables_metadata(&ready_check_result.unknown_leader_tables.iter().collect())
-                .await?;
+            let mut table_paths: HashSet<&TablePath> = HashSet::new();
+            let mut physical_table_paths: HashSet<&Arc<PhysicalTablePath>> = 
HashSet::new();
+
+            for unknown_paths in 
ready_check_result.unknown_leader_tables.iter() {
+                if unknown_paths.get_partition_name().is_some() {
+                    physical_table_paths.insert(unknown_paths);
+                } else {
+                    table_paths.insert(unknown_paths.get_table_path());
+                }
+            }
+
+            if let Err(e) = self
+                .metadata
+                .update_tables_metadata(&table_paths, &physical_table_paths, 
vec![])
+                .await
+            {
+                match &e {
+                    crate::error::Error::FlussAPIError { api_error }
+                        if api_error.code == 
FlussError::PartitionNotExists.code() =>
+                    {
+                        warn!(
+                            "Partition does not exist during metadata update, 
continuing: {}",
+                            api_error
+                        );
+                    }
+                    _ => return Err(e),
+                }
+            }
+
+            debug!(
+                "Client update metadata due to unknown leader tables from the 
batched records: {:?}",
+                ready_check_result.unknown_leader_tables
+            );
         }
 
         if ready_check_result.ready_nodes.is_empty() {
@@ -327,10 +357,15 @@ impl Sender {
         response: R,
     ) -> Result<()> {
         let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+        let mut invalid_physical_table_paths: HashSet<Arc<PhysicalTablePath>> 
= HashSet::new();
         let mut pending_buckets: HashSet<TableBucket> = 
request_buckets.iter().cloned().collect();
 
         for bucket_resp in response.buckets_resp() {
-            let tb = TableBucket::new(table_id, bucket_resp.bucket_id());
+            let tb = TableBucket::new_with_partition(
+                table_id,
+                bucket_resp.partition_id(),
+                bucket_resp.bucket_id(),
+            );
             let Some(ready_batch) = records_by_bucket.remove(&tb) else {
                 panic!("Missing ready batch for table bucket {tb}");
             };
@@ -343,11 +378,13 @@ impl Sender {
                         .error_message()
                         .cloned()
                         .unwrap_or_else(|| error.message().to_string());
-                    if let Some(table_path) = self
+                    if let Some(physical_table_path) = self
                         .handle_write_batch_error(ready_batch, error, message)
                         .await?
                     {
-                        invalid_metadata_tables.insert(table_path);
+                        invalid_metadata_tables
+                            
.insert(physical_table_path.get_table_path().clone());
+                        
invalid_physical_table_paths.insert(physical_table_path);
                     }
                 }
                 _ => self.complete_batch(ready_batch),
@@ -356,7 +393,7 @@ impl Sender {
 
         for bucket in pending_buckets {
             if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
-                if let Some(table_path) = self
+                if let Some(physical_table_path) = self
                     .handle_write_batch_error(
                         ready_batch,
                         FlussError::UnknownServerError,
@@ -364,12 +401,13 @@ impl Sender {
                     )
                     .await?
                 {
-                    invalid_metadata_tables.insert(table_path);
+                    
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
+                    invalid_physical_table_paths.insert(physical_table_path);
                 }
             }
         }
 
-        self.update_metadata_if_needed(invalid_metadata_tables)
+        self.update_metadata_if_needed(invalid_metadata_tables, 
invalid_physical_table_paths)
             .await;
         Ok(())
     }
@@ -398,15 +436,18 @@ impl Sender {
         message: String,
     ) -> Result<()> {
         let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+        let mut invalid_physical_table_paths: HashSet<Arc<PhysicalTablePath>> 
= HashSet::new();
+
         for batch in batches {
-            if let Some(table_path) = self
+            if let Some(physical_table_path) = self
                 .handle_write_batch_error(batch, error, message.clone())
                 .await?
             {
-                invalid_metadata_tables.insert(table_path);
+                
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
+                invalid_physical_table_paths.insert(physical_table_path);
             }
         }
-        self.update_metadata_if_needed(invalid_metadata_tables)
+        self.update_metadata_if_needed(invalid_metadata_tables, 
invalid_physical_table_paths)
             .await;
         Ok(())
     }
@@ -432,20 +473,22 @@ impl Sender {
         ready_write_batch: ReadyWriteBatch,
         error: FlussError,
         message: String,
-    ) -> Result<Option<TablePath>> {
-        let table_path = ready_write_batch.write_batch.table_path().clone();
+    ) -> Result<Option<Arc<PhysicalTablePath>>> {
+        let physical_table_path = 
Arc::clone(ready_write_batch.write_batch.physical_table_path());
         if self.can_retry(&ready_write_batch, error) {
             warn!(
-                "Retrying write batch for {table_path} on bucket {} after 
error {error:?}: {message}",
+                "Retrying write batch for {} on bucket {} after error 
{error:?}: {message}",
+                physical_table_path.as_ref(),
                 ready_write_batch.table_bucket.bucket_id()
             );
             self.re_enqueue_batch(ready_write_batch).await;
-            return 
Ok(Self::is_invalid_metadata_error(error).then_some(table_path));
+            return 
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
         }
 
         if error == FlussError::DuplicateSequenceException {
             warn!(
-                "Duplicate sequence for {table_path} on bucket {}: {message}",
+                "Duplicate sequence for {} on bucket {}: {message}",
+                physical_table_path.as_ref(),
                 ready_write_batch.table_bucket.bucket_id()
             );
             self.complete_batch(ready_write_batch);
@@ -459,7 +502,7 @@ impl Sender {
                 message,
             },
         );
-        Ok(Self::is_invalid_metadata_error(error).then_some(table_path))
+        
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path))
     }
 
     async fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) {
@@ -484,12 +527,22 @@ impl Sender {
             && Self::is_retriable_error(error)
     }
 
-    async fn update_metadata_if_needed(&self, table_paths: HashSet<TablePath>) 
{
+    async fn update_metadata_if_needed(
+        &self,
+        table_paths: HashSet<TablePath>,
+        physical_table_path: HashSet<Arc<PhysicalTablePath>>,
+    ) {
         if table_paths.is_empty() {
             return;
         }
         let table_path_refs: HashSet<&TablePath> = 
table_paths.iter().collect();
-        if let Err(e) = 
self.metadata.update_tables_metadata(&table_path_refs).await {
+        let physical_table_path_refs: HashSet<&Arc<PhysicalTablePath>> =
+            physical_table_path.iter().collect();
+        if let Err(e) = self
+            .metadata
+            .update_tables_metadata(&table_path_refs, 
&physical_table_path_refs, vec![])
+            .await
+        {
             warn!("Failed to update metadata after write error: {e:?}");
         }
     }
@@ -536,6 +589,8 @@ trait BucketResponse {
     fn bucket_id(&self) -> i32;
     fn error_code(&self) -> Option<i32>;
     fn error_message(&self) -> Option<&String>;
+
+    fn partition_id(&self) -> Option<PartitionId>;
 }
 
 impl BucketResponse for PbProduceLogRespForBucket {
@@ -548,6 +603,10 @@ impl BucketResponse for PbProduceLogRespForBucket {
     fn error_message(&self) -> Option<&String> {
         self.error_message.as_ref()
     }
+
+    fn partition_id(&self) -> Option<PartitionId> {
+        self.partition_id
+    }
 }
 
 impl BucketResponse for PbPutKvRespForBucket {
@@ -560,6 +619,10 @@ impl BucketResponse for PbPutKvRespForBucket {
     fn error_message(&self) -> Option<&String> {
         self.error_message.as_ref()
     }
+
+    fn partition_id(&self) -> Option<PartitionId> {
+        self.partition_id
+    }
 }
 
 trait WriteResponse {
@@ -587,11 +650,11 @@ mod tests {
     use crate::client::WriteRecord;
     use crate::cluster::Cluster;
     use crate::config::Config;
-    use crate::metadata::TablePath;
+    use crate::metadata::{PhysicalTablePath, TablePath};
     use crate::proto::{PbProduceLogRespForBucket, ProduceLogResponse};
     use crate::row::{Datum, GenericRow};
     use crate::rpc::FlussError;
-    use crate::test_utils::build_cluster_arc;
+    use crate::test_utils::{build_cluster_arc, build_table_info};
     use std::collections::{HashMap, HashSet};
 
     async fn build_ready_batch(
@@ -599,8 +662,11 @@ mod tests {
         cluster: Arc<Cluster>,
         table_path: Arc<TablePath>,
     ) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> {
+        let table_info = 
Arc::new(build_table_info(table_path.as_ref().clone(), 1, 1));
+        let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
         let record = WriteRecord::for_append(
-            table_path,
+            table_info,
+            physical_table_path,
             1,
             GenericRow {
                 values: vec![Datum::Int32(1)],
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index 65b04f5..c386adf 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -15,20 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::BucketId;
+use crate::bucketing::BucketingFunction;
 use crate::client::metadata::Metadata;
-use crate::client::write::bucket_assigner::{BucketAssigner, 
StickyBucketAssigner};
+use crate::client::write::bucket_assigner::{
+    BucketAssigner, HashBucketAssigner, StickyBucketAssigner,
+};
 use crate::client::write::sender::Sender;
 use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
 use crate::config::Config;
-use crate::metadata::TablePath;
+use crate::error::{Error, Result};
+use crate::metadata::{PhysicalTablePath, TableInfo};
 use bytes::Bytes;
 use dashmap::DashMap;
 use std::sync::Arc;
 use tokio::sync::mpsc;
 use tokio::task::JoinHandle;
 
-use crate::error::{Error, Result};
-
 #[allow(dead_code)]
 pub struct WriterClient {
     config: Config,
@@ -37,7 +40,7 @@ pub struct WriterClient {
     shutdown_tx: mpsc::Sender<()>,
     sender_join_handle: JoinHandle<()>,
     metadata: Arc<Metadata>,
-    bucket_assigners: DashMap<TablePath, Arc<Box<dyn BucketAssigner>>>,
+    bucket_assigners: DashMap<Arc<PhysicalTablePath>, Arc<dyn BucketAssigner>>,
 }
 
 impl WriterClient {
@@ -89,11 +92,12 @@ impl WriterClient {
     }
 
     pub async fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle> 
{
-        let table_path = &record.table_path;
+        let physical_table_path = &record.physical_table_path;
         let cluster = self.metadata.get_cluster();
         let bucket_key = record.bucket_key.as_ref();
 
-        let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key, 
table_path)?;
+        let (bucket_assigner, bucket_id) =
+            self.assign_bucket(&record.table_info, bucket_key, 
physical_table_path)?;
 
         let mut result = self
             .accumulate
@@ -118,17 +122,19 @@ impl WriterClient {
     }
     fn assign_bucket(
         &self,
+        table_info: &Arc<TableInfo>,
         bucket_key: Option<&Bytes>,
-        table_path: &Arc<TablePath>,
-    ) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
+        table_path: &Arc<PhysicalTablePath>,
+    ) -> Result<(Arc<dyn BucketAssigner>, BucketId)> {
         let cluster = self.metadata.get_cluster();
         let bucket_assigner = {
             if let Some(assigner) = self.bucket_assigners.get(table_path) {
                 assigner.clone()
             } else {
-                let assigner = 
Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
+                let assigner =
+                    Self::create_bucket_assigner(table_info, 
Arc::clone(table_path), bucket_key)?;
                 self.bucket_assigners
-                    .insert(table_path.as_ref().clone(), assigner.clone());
+                    .insert(Arc::clone(table_path), 
Arc::clone(&assigner.clone()));
                 assigner
             }
         };
@@ -160,8 +166,21 @@ impl WriterClient {
         Ok(())
     }
 
-    pub fn create_bucket_assigner(table_path: &TablePath) -> Box<dyn 
BucketAssigner> {
-        // always sticky
-        Box::new(StickyBucketAssigner::new(table_path.clone()))
+    pub fn create_bucket_assigner(
+        table_info: &Arc<TableInfo>,
+        table_path: Arc<PhysicalTablePath>,
+        bucket_key: Option<&Bytes>,
+    ) -> Result<Arc<dyn BucketAssigner>> {
+        if bucket_key.is_some() {
+            let datalake_format = 
table_info.get_table_config().get_datalake_format()?;
+            let function = <dyn 
BucketingFunction>::of(datalake_format.as_ref());
+            Ok(Arc::new(HashBucketAssigner::new(
+                table_info.num_buckets,
+                function,
+            )))
+        } else {
+            // TODO: Wire up toi use round robin/sticky according to 
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
+            Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+        }
     }
 }
diff --git a/crates/fluss/src/cluster/cluster.rs 
b/crates/fluss/src/cluster/cluster.rs
index d6fe0ae..1f950ad 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -15,16 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::BucketId;
 use crate::cluster::{BucketLocation, ServerNode, ServerType};
+use crate::error::Error::PartitionNotExist;
 use crate::error::{Error, Result};
 use crate::metadata::{
     JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, 
TablePath,
 };
-use crate::proto::MetadataResponse;
+use crate::proto::{MetadataResponse, PbBucketMetadata};
 use crate::rpc::{from_pb_server_node, from_pb_table_path};
+use crate::{BucketId, PartitionId, TableId};
 use rand::random_range;
 use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
 
 static EMPTY: Vec<BucketLocation> = Vec::new();
 
@@ -33,27 +35,35 @@ pub struct Cluster {
     coordinator_server: Option<ServerNode>,
     alive_tablet_servers_by_id: HashMap<i32, ServerNode>,
     alive_tablet_servers: Vec<ServerNode>,
-    available_locations_by_path: HashMap<TablePath, Vec<BucketLocation>>,
+    available_locations_by_path: HashMap<Arc<PhysicalTablePath>, 
Vec<BucketLocation>>,
     available_locations_by_bucket: HashMap<TableBucket, BucketLocation>,
-    table_id_by_path: HashMap<TablePath, i64>,
-    table_path_by_id: HashMap<i64, TablePath>,
+    table_id_by_path: HashMap<TablePath, TableId>,
+    table_path_by_id: HashMap<TableId, TablePath>,
     table_info_by_path: HashMap<TablePath, TableInfo>,
+    partitions_id_by_path: HashMap<Arc<PhysicalTablePath>, PartitionId>,
+    partition_name_by_id: HashMap<PartitionId, String>,
 }
 
 impl Cluster {
+    #[allow(clippy::too_many_arguments)]
     pub fn new(
         coordinator_server: Option<ServerNode>,
         alive_tablet_servers_by_id: HashMap<i32, ServerNode>,
-        available_locations_by_path: HashMap<TablePath, Vec<BucketLocation>>,
+        available_locations_by_path: HashMap<Arc<PhysicalTablePath>, 
Vec<BucketLocation>>,
         available_locations_by_bucket: HashMap<TableBucket, BucketLocation>,
-        table_id_by_path: HashMap<TablePath, i64>,
+        table_id_by_path: HashMap<TablePath, TableId>,
         table_info_by_path: HashMap<TablePath, TableInfo>,
+        partitions_id_by_path: HashMap<Arc<PhysicalTablePath>, PartitionId>,
     ) -> Self {
         let alive_tablet_servers = 
alive_tablet_servers_by_id.values().cloned().collect();
         let table_path_by_id = table_id_by_path
             .iter()
             .map(|(path, table_id)| (*table_id, path.clone()))
             .collect();
+        let partition_name_by_id = partitions_id_by_path
+            .iter()
+            .filter_map(|(path, id)| path.get_partition_name().map(|name| 
(*id, name.clone())))
+            .collect();
         Cluster {
             coordinator_server,
             alive_tablet_servers_by_id,
@@ -63,10 +73,12 @@ impl Cluster {
             table_id_by_path,
             table_path_by_id,
             table_info_by_path,
+            partitions_id_by_path,
+            partition_name_by_id,
         }
     }
 
-    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) -> 
Self {
+    pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<TableId>) 
-> Self {
         let alive_tablet_servers_by_id = self
             .alive_tablet_servers_by_id
             .iter()
@@ -89,6 +101,7 @@ impl Cluster {
             available_locations_by_bucket,
             self.table_id_by_path.clone(),
             self.table_info_by_path.clone(),
+            self.partitions_id_by_path.clone(),
         )
     }
 
@@ -110,6 +123,7 @@ impl Cluster {
             available_locations_by_bucket,
             self.table_id_by_path.clone(),
             self.table_info_by_path.clone(),
+            self.partitions_id_by_path.clone(),
         )
     }
 
@@ -123,6 +137,8 @@ impl Cluster {
             table_id_by_path,
             table_path_by_id,
             table_info_by_path,
+            partitions_id_by_path,
+            partition_name_by_id,
         } = cluster;
         self.coordinator_server = coordinator_server;
         self.alive_tablet_servers_by_id = alive_tablet_servers_by_id;
@@ -132,26 +148,30 @@ impl Cluster {
         self.table_id_by_path = table_id_by_path;
         self.table_path_by_id = table_path_by_id;
         self.table_info_by_path = table_info_by_path;
+        self.partitions_id_by_path = partitions_id_by_path;
+        self.partition_name_by_id = partition_name_by_id;
     }
 
     fn filter_bucket_locations_by_path(
         &self,
         table_paths: &HashSet<&TablePath>,
     ) -> (
-        HashMap<TablePath, Vec<BucketLocation>>,
+        HashMap<Arc<PhysicalTablePath>, Vec<BucketLocation>>,
         HashMap<TableBucket, BucketLocation>,
     ) {
         let available_locations_by_path = self
             .available_locations_by_path
             .iter()
-            .filter(|&(path, _)| !table_paths.contains(path))
+            .filter(|&(path, _)| !table_paths.contains(path.get_table_path()))
             .map(|(path, locations)| (path.clone(), locations.clone()))
             .collect();
 
         let available_locations_by_bucket = self
             .available_locations_by_bucket
             .iter()
-            .filter(|&(_bucket, location)| 
!table_paths.contains(&location.table_path))
+            .filter(|&(_bucket, location)| {
+                
!table_paths.contains(&location.physical_table_path.get_table_path())
+            })
             .map(|(bucket, location)| (bucket.clone(), location.clone()))
             .collect();
 
@@ -175,15 +195,19 @@ impl Cluster {
 
         let mut table_id_by_path = HashMap::new();
         let mut table_info_by_path = HashMap::new();
+        let mut partitions_id_by_path = HashMap::new();
+        let mut tmp_available_locations_by_path = HashMap::new();
+        let mut tmp_available_location_by_bucket = HashMap::new();
+
         if let Some(origin) = origin_cluster {
             table_info_by_path.extend(origin.get_table_info_by_path().clone());
             table_id_by_path.extend(origin.get_table_id_by_path().clone());
+            partitions_id_by_path.extend(origin.partitions_id_by_path.clone());
+            
tmp_available_locations_by_path.extend(origin.available_locations_by_path.clone());
+            
tmp_available_location_by_bucket.extend(origin.available_locations_by_bucket.clone());
         }
 
-        // Index the bucket locations by table path, and index bucket location 
by bucket
-        let mut tmp_available_location_by_bucket = HashMap::new();
-        let mut tmp_available_locations_by_path = HashMap::new();
-
+        // iterate all table metadata
         for table_metadata in metadata_response.table_metadata {
             let table_id = table_metadata.table_id;
             let table_path = from_pb_table_path(&table_metadata.table_path);
@@ -207,39 +231,56 @@ impl Cluster {
             table_info_by_path.insert(table_path.clone(), table_info);
             table_id_by_path.insert(table_path.clone(), table_id);
 
-            // now, get bucket matadata
-            let mut found_unavailable_bucket = false;
-            let mut available_bucket_for_table = vec![];
-            let mut bucket_for_table = vec![];
-            for bucket_metadata in table_metadata.bucket_metadata {
-                let bucket_id = bucket_metadata.bucket_id;
-                let bucket = TableBucket::new(table_id, bucket_id);
-                let bucket_location;
-                if let Some(leader_id) = bucket_metadata.leader_id
-                    && let Some(server_node) = servers.get(&leader_id)
-                {
-                    bucket_location = BucketLocation::new(
-                        bucket.clone(),
-                        Some(server_node.clone()),
-                        table_path.clone(),
-                    );
-                    available_bucket_for_table.push(bucket_location.clone());
-                    tmp_available_location_by_bucket
-                        .insert(bucket.clone(), bucket_location.clone());
-                } else {
-                    found_unavailable_bucket = true;
-                    bucket_location = BucketLocation::new(bucket.clone(), 
None, table_path.clone());
-                }
-                bucket_for_table.push(bucket_location.clone());
+            let bucket_metadata = table_metadata.bucket_metadata;
+            let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
+
+            let bucket_locations = get_bucket_locations(
+                &mut servers,
+                bucket_metadata.as_slice(),
+                table_id,
+                None,
+                &physical_table_path,
+            );
+            tmp_available_locations_by_path.insert(physical_table_path, 
bucket_locations);
+        }
+
+        // iterate all partition metadata
+        for partition_metadata in metadata_response.partition_metadata {
+            let table_id = partition_metadata.table_id;
+
+            if let Some(cluster) = origin_cluster {
+                let partition_name = partition_metadata.partition_name;
+                let table_path = 
cluster.get_table_path_by_id(table_id).unwrap();
+                let partition_id = partition_metadata.partition_id;
+
+                let physical_table_path = 
Arc::new(PhysicalTablePath::of_partitioned(
+                    Arc::new(table_path.clone()),
+                    Some(partition_name),
+                ));
+
+                partitions_id_by_path.insert(Arc::clone(&physical_table_path), 
partition_id);
+
+                let bucket_locations = get_bucket_locations(
+                    &mut servers,
+                    partition_metadata.bucket_metadata.as_slice(),
+                    table_id,
+                    Some(partition_id),
+                    &physical_table_path,
+                );
+
+                tmp_available_locations_by_path.insert(physical_table_path, 
bucket_locations);
             }
+        }
 
-            if found_unavailable_bucket {
-                tmp_available_locations_by_path
-                    .insert(table_path.clone(), 
available_bucket_for_table.clone());
-            } else {
-                tmp_available_locations_by_path.insert(table_path.clone(), 
bucket_for_table);
+        for bucket_locations in &mut tmp_available_locations_by_path.values() {
+            for location in bucket_locations {
+                if location.leader().is_some() {
+                    tmp_available_location_by_bucket
+                        .insert(location.table_bucket.clone(), 
location.clone());
+                }
             }
         }
+
         Ok(Cluster::new(
             coordinator_server,
             servers,
@@ -247,6 +288,7 @@ impl Cluster {
             tmp_available_location_by_bucket,
             table_id_by_path,
             table_info_by_path,
+            partitions_id_by_path,
         ))
     }
 
@@ -269,14 +311,43 @@ impl Cluster {
 
     pub fn get_table_bucket(
         &self,
-        table_path: &TablePath,
+        physical_table_path: &PhysicalTablePath,
         bucket_id: BucketId,
     ) -> Result<TableBucket> {
-        let table_info = self.get_table(table_path)?;
-        Ok(TableBucket::new(table_info.table_id, bucket_id))
+        let table_info = self.get_table(physical_table_path.get_table_path())?;
+        let partition_id = self.get_partition_id(physical_table_path);
+
+        if physical_table_path.get_partition_name().is_some() && 
partition_id.is_none() {
+            return Err(PartitionNotExist {
+                message: format!(
+                    "The partition {} is not found in cluster",
+                    physical_table_path.get_partition_name().unwrap()
+                ),
+            });
+        }
+
+        Ok(TableBucket::new_with_partition(
+            table_info.table_id,
+            partition_id,
+            bucket_id,
+        ))
     }
 
-    pub fn get_bucket_locations_by_path(&self) -> &HashMap<TablePath, 
Vec<BucketLocation>> {
+    pub fn get_partition_id(&self, physical_table_path: &PhysicalTablePath) -> 
Option<PartitionId> {
+        self.partitions_id_by_path.get(physical_table_path).copied()
+    }
+
+    pub fn get_partition_name(&self, partition_id: PartitionId) -> 
Option<&String> {
+        self.partition_name_by_id.get(&partition_id)
+    }
+
+    pub fn get_table_id(&self, table_path: &TablePath) -> Option<i64> {
+        self.table_id_by_path.get(table_path).copied()
+    }
+
+    pub fn get_bucket_locations_by_path(
+        &self,
+    ) -> &HashMap<Arc<PhysicalTablePath>, Vec<BucketLocation>> {
         &self.available_locations_by_path
     }
 
@@ -288,13 +359,13 @@ impl Cluster {
         &self.table_id_by_path
     }
 
-    pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> {
+    pub fn get_table_path_by_id(&self, table_id: TableId) -> 
Option<&TablePath> {
         self.table_path_by_id.get(&table_id)
     }
 
     pub fn get_available_buckets_for_table_path(
         &self,
-        table_path: &TablePath,
+        table_path: &PhysicalTablePath,
     ) -> &Vec<BucketLocation> {
         self.available_locations_by_path
             .get(table_path)
@@ -327,4 +398,37 @@ impl Cluster {
     pub fn opt_get_table(&self, table_path: &TablePath) -> Option<&TableInfo> {
         self.table_info_by_path.get(table_path)
     }
+
+    pub fn get_partition_id_by_path(&self) -> &HashMap<Arc<PhysicalTablePath>, 
PartitionId> {
+        &self.partitions_id_by_path
+    }
+}
+
+fn get_bucket_locations(
+    servers: &mut HashMap<i32, ServerNode>,
+    bucket_metadata: &[PbBucketMetadata],
+    table_id: i64,
+    partition_id: Option<PartitionId>,
+    physical_table_path: &Arc<PhysicalTablePath>,
+) -> Vec<BucketLocation> {
+    let mut bucket_locations = Vec::new();
+    for metadata in bucket_metadata {
+        let bucket_id = metadata.bucket_id;
+        let bucket = TableBucket::new_with_partition(table_id, partition_id, 
bucket_id);
+
+        let server = if let Some(leader_id) = metadata.leader_id
+            && let Some(server_node) = servers.get(&leader_id)
+        {
+            Some(server_node.clone())
+        } else {
+            None
+        };
+
+        bucket_locations.push(BucketLocation::new(
+            bucket.clone(),
+            server,
+            Arc::clone(physical_table_path),
+        ));
+    }
+    bucket_locations
 }
diff --git a/crates/fluss/src/cluster/mod.rs b/crates/fluss/src/cluster/mod.rs
index f9d42e4..58e80c0 100644
--- a/crates/fluss/src/cluster/mod.rs
+++ b/crates/fluss/src/cluster/mod.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use crate::BucketId;
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket};
+use std::sync::Arc;
 
 #[allow(clippy::module_inception)]
 mod cluster;
@@ -69,19 +70,19 @@ pub enum ServerType {
 pub struct BucketLocation {
     pub table_bucket: TableBucket,
     leader: Option<ServerNode>,
-    pub table_path: TablePath,
+    physical_table_path: Arc<PhysicalTablePath>,
 }
 
 impl BucketLocation {
     pub fn new(
         table_bucket: TableBucket,
         leader: Option<ServerNode>,
-        table_path: TablePath,
+        physical_table_path: Arc<PhysicalTablePath>,
     ) -> BucketLocation {
         BucketLocation {
             table_bucket,
             leader,
-            table_path,
+            physical_table_path,
         }
     }
 
@@ -96,4 +97,8 @@ impl BucketLocation {
     pub fn bucket_id(&self) -> BucketId {
         self.table_bucket.bucket_id()
     }
+
+    pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+        &self.physical_table_path
+    }
 }
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 68426d7..ef86530 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -99,6 +99,12 @@ pub enum Error {
     )]
     InvalidPartition { message: String },
 
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting partition not exist error {}.", message)
+    )]
+    PartitionNotExist { message: String },
+
     #[snafu(
         visibility(pub(crate)),
         display("Fluss hitting IO not supported error {}.", message)
diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index e365237..6431d3a 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -930,6 +930,15 @@ impl RowType {
         self.fields.iter().map(|f| f.name.as_str()).collect()
     }
 
+    pub fn project_with_field_names(&self, field_names: &[String]) -> 
Result<RowType> {
+        let indices: Vec<usize> = field_names
+            .iter()
+            .filter_map(|pk| self.get_field_index(pk))
+            .collect();
+
+        self.project(indices.as_slice())
+    }
+
     pub fn project(&self, project_field_positions: &[usize]) -> 
Result<RowType> {
         Ok(RowType::with_nullable(
             self.nullable,
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 7b93aca..ce362c4 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -25,6 +25,7 @@ use core::fmt;
 use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
 use std::fmt::{Display, Formatter};
+use std::sync::Arc;
 use strum_macros::EnumString;
 
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -172,7 +173,7 @@ impl SchemaBuilder {
                 self
             }
             _ => {
-                panic!("data type msut be row type")
+                panic!("data type must be row type")
             }
         }
     }
@@ -325,7 +326,7 @@ pub struct TableDescriptorBuilder {
     schema: Option<Schema>,
     properties: HashMap<String, String>,
     custom_properties: HashMap<String, String>,
-    partition_keys: Vec<String>,
+    partition_keys: Arc<[String]>,
     comment: Option<String>,
     table_distribution: Option<TableDistribution>,
 }
@@ -374,7 +375,7 @@ impl TableDescriptorBuilder {
     }
 
     pub fn partitioned_by(mut self, partition_keys: Vec<String>) -> Self {
-        self.partition_keys = partition_keys;
+        self.partition_keys = Arc::from(partition_keys);
         self
     }
 
@@ -413,7 +414,7 @@ impl TableDescriptorBuilder {
 pub struct TableDescriptor {
     schema: Schema,
     comment: Option<String>,
-    partition_keys: Vec<String>,
+    partition_keys: Arc<[String]>,
     table_distribution: Option<TableDistribution>,
     properties: HashMap<String, String>,
     custom_properties: HashMap<String, String>,
@@ -749,19 +750,19 @@ impl TablePath {
 /// `partition_name` will be `Some(...)`; otherwise, it will be `None`.
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct PhysicalTablePath {
-    table_path: TablePath,
+    table_path: Arc<TablePath>,
     partition_name: Option<String>,
 }
 
 impl PhysicalTablePath {
-    pub fn of(table_path: TablePath) -> Self {
+    pub fn of(table_path: Arc<TablePath>) -> Self {
         Self {
             table_path,
             partition_name: None,
         }
     }
 
-    pub fn of_partitioned(table_path: TablePath, partition_name: 
Option<String>) -> Self {
+    pub fn of_partitioned(table_path: Arc<TablePath>, partition_name: 
Option<String>) -> Self {
         Self {
             table_path,
             partition_name,
@@ -774,7 +775,7 @@ impl PhysicalTablePath {
         partition_name: Option<String>,
     ) -> Self {
         Self {
-            table_path: TablePath::new(database_name, table_name),
+            table_path: Arc::new(TablePath::new(database_name, table_name)),
             partition_name,
         }
     }
@@ -815,7 +816,7 @@ pub struct TableInfo {
     pub primary_keys: Vec<String>,
     pub physical_primary_keys: Vec<String>,
     pub bucket_keys: Vec<String>,
-    pub partition_keys: Vec<String>,
+    pub partition_keys: Arc<[String]>,
     pub num_buckets: i32,
     pub properties: HashMap<String, String>,
     pub table_config: TableConfig,
@@ -982,7 +983,7 @@ impl TableInfo {
         schema_id: i32,
         schema: Schema,
         bucket_keys: Vec<String>,
-        partition_keys: Vec<String>,
+        partition_keys: Arc<[String]>,
         num_buckets: i32,
         properties: HashMap<String, String>,
         custom_properties: HashMap<String, String>,
@@ -1080,7 +1081,7 @@ impl TableInfo {
                 .is_auto_partition_enabled()
     }
 
-    pub fn get_partition_keys(&self) -> &[String] {
+    pub fn get_partition_keys(&self) -> &Arc<[String]> {
         &self.partition_keys
     }
 
@@ -1115,7 +1116,7 @@ impl TableInfo {
     pub fn to_table_descriptor(&self) -> Result<TableDescriptor> {
         let mut builder = TableDescriptor::builder()
             .schema(self.schema.clone())
-            .partitioned_by(self.partition_keys.clone())
+            .partitioned_by(self.partition_keys.to_vec())
             .distributed_by(Some(self.num_buckets), self.bucket_keys.clone())
             .properties(self.properties.clone())
             .custom_properties(self.custom_properties.clone());
@@ -1177,6 +1178,18 @@ impl TableBucket {
         }
     }
 
+    pub fn new_with_partition(
+        table_id: TableId,
+        partition_id: Option<PartitionId>,
+        bucket: BucketId,
+    ) -> Self {
+        TableBucket {
+            table_id,
+            partition_id,
+            bucket,
+        }
+    }
+
     pub fn table_id(&self) -> TableId {
         self.table_id
     }
@@ -1308,7 +1321,7 @@ mod tests {
             1,
             schema.clone(),
             vec!["id".to_string()],
-            vec![], // No partition keys
+            Arc::from(vec![]), // No partition keys
             1,
             properties.clone(),
             HashMap::new(),
@@ -1329,7 +1342,7 @@ mod tests {
             1,
             schema.clone(),
             vec!["id".to_string()],
-            vec![], // No partition keys
+            Arc::from(vec![]), // No partition keys
             1,
             properties.clone(),
             HashMap::new(),
@@ -1350,7 +1363,7 @@ mod tests {
             1,
             schema.clone(),
             vec!["id".to_string()],
-            vec!["name".to_string()], // Partition keys
+            Arc::from(vec!["name".to_string()]), // Partition keys
             1,
             properties.clone(),
             HashMap::new(),
@@ -1371,7 +1384,7 @@ mod tests {
             1,
             schema.clone(),
             vec!["id".to_string()],
-            vec!["name".to_string()], // Partition keys
+            Arc::from(vec!["name".to_string()]), // Partition keys
             1,
             properties.clone(),
             HashMap::new(),
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 6340dc8..726106b 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1450,6 +1450,7 @@ pub struct MyVec<T>(pub StreamReader<T>);
 mod tests {
     use super::*;
     use crate::metadata::{DataField, DataTypes, RowType};
+    use crate::test_utils::build_table_info;
 
     #[test]
     fn test_to_array_type() {
@@ -1932,7 +1933,7 @@ mod tests {
         use crate::compression::{
             ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
         };
-        use crate::metadata::TablePath;
+        use crate::metadata::{PhysicalTablePath, TablePath};
         use crate::row::GenericRow;
         use tempfile::NamedTempFile;
 
@@ -1941,7 +1942,9 @@ mod tests {
             DataField::new("id".to_string(), DataTypes::int(), None),
             DataField::new("name".to_string(), DataTypes::string(), None),
         ]);
-        let table_path = Arc::new(TablePath::new("db".to_string(), 
"tbl".to_string()));
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+        let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
 
         let mut builder = MemoryLogRecordsArrowBuilder::new(
             1,
@@ -1956,13 +1959,15 @@ mod tests {
         let mut row = GenericRow::new(2);
         row.set_field(0, 1_i32);
         row.set_field(1, "alice");
-        let record = WriteRecord::for_append(table_path.clone(), 1, row);
+        let record =
+            WriteRecord::for_append(Arc::clone(&table_info), 
physical_table_path.clone(), 1, row);
         builder.append(&record)?;
 
         let mut row2 = GenericRow::new(2);
         row2.set_field(0, 2_i32);
         row2.set_field(1, "bob");
-        let record2 = WriteRecord::for_append(table_path, 2, row2);
+        let record2 =
+            WriteRecord::for_append(Arc::clone(&table_info), 
physical_table_path, 2, row2);
         builder.append(&record2)?;
 
         let data = builder.build()?;
diff --git a/crates/fluss/src/rpc/message/update_metadata.rs 
b/crates/fluss/src/rpc/message/update_metadata.rs
index a6e6288..1f0d88c 100644
--- a/crates/fluss/src/rpc/message/update_metadata.rs
+++ b/crates/fluss/src/rpc/message/update_metadata.rs
@@ -15,14 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::proto::{MetadataResponse, PbTablePath};
+use crate::metadata::{PhysicalTablePath, TablePath};
+use crate::proto::{MetadataResponse, PbPhysicalTablePath, PbTablePath};
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::ReadError;
 use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
-
-use crate::metadata::TablePath;
-use crate::rpc::frame::ReadError;
+use std::collections::HashSet;
+use std::sync::Arc;
 
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use bytes::{Buf, BufMut};
@@ -33,7 +34,11 @@ pub struct UpdateMetadataRequest {
 }
 
 impl UpdateMetadataRequest {
-    pub fn new(table_paths: &[&TablePath]) -> Self {
+    pub fn new(
+        table_paths: &HashSet<&TablePath>,
+        physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
+        partition_ids: Vec<i64>,
+    ) -> Self {
         UpdateMetadataRequest {
             inner_request: proto::MetadataRequest {
                 table_path: table_paths
@@ -43,8 +48,15 @@ impl UpdateMetadataRequest {
                         table_name: path.table().to_string(),
                     })
                     .collect(),
-                partitions_path: vec![],
-                partitions_id: vec![],
+                partitions_path: physical_table_paths
+                    .iter()
+                    .map(|path| PbPhysicalTablePath {
+                        database_name: path.get_database_name().to_string(),
+                        table_name: path.get_table_name().to_string(),
+                        partition_name: path.get_partition_name().map(|pn| 
pn.to_string()),
+                    })
+                    .collect(),
+                partitions_id: partition_ids,
             },
         }
     }
diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs
index d1cd3ec..8e8fbe4 100644
--- a/crates/fluss/src/test_utils.rs
+++ b/crates/fluss/src/test_utils.rs
@@ -17,7 +17,8 @@
 
 use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType};
 use crate::metadata::{
-    DataField, DataTypes, Schema, TableBucket, TableDescriptor, TableInfo, 
TablePath,
+    DataField, DataTypes, PhysicalTablePath, Schema, TableBucket, 
TableDescriptor, TableInfo,
+    TablePath,
 };
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -53,12 +54,15 @@ pub(crate) fn build_cluster(table_path: &TablePath, 
table_id: i64, buckets: i32)
         let bucket_location = BucketLocation::new(
             table_bucket.clone(),
             Some(server.clone()),
-            table_path.clone(),
+            Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))),
         );
         bucket_locations.push(bucket_location.clone());
         locations_by_bucket.insert(table_bucket, bucket_location);
     }
-    locations_by_path.insert(table_path.clone(), bucket_locations);
+    locations_by_path.insert(
+        Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))),
+        bucket_locations,
+    );
 
     let mut table_id_by_path = HashMap::new();
     table_id_by_path.insert(table_path.clone(), table_id);
@@ -76,6 +80,7 @@ pub(crate) fn build_cluster(table_path: &TablePath, table_id: 
i64, buckets: i32)
         locations_by_bucket,
         table_id_by_path,
         table_info_by_path,
+        HashMap::new(),
     )
 }
 
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index a4f2961..b2263c2 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -36,7 +36,7 @@ mod kv_table_test {
     use crate::integration::fluss_cluster::FlussTestingCluster;
     use crate::integration::utils::{create_table, get_cluster, start_cluster, 
stop_cluster};
     use fluss::client::UpsertWriter;
-    use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+    use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
     use fluss::row::{GenericRow, InternalRow};
     use std::sync::Arc;
 
@@ -438,6 +438,179 @@ mod kv_table_test {
             .expect("Failed to drop table");
     }
 
+    #[tokio::test]
+    async fn partitioned_table_upsert_and_lookup() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path =
+            TablePath::new("fluss".to_string(), 
"test_partitioned_kv_table".to_string());
+
+        // Create a partitioned KV table with region as partition key
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("region", DataTypes::string())
+                    .column("user_id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .column("score", DataTypes::bigint())
+                    .primary_key(vec!["region".to_string(), 
"user_id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .partitioned_by(vec!["region".to_string()])
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        // Create partitions for each region before inserting data
+        for region in &["US", "EU", "APAC"] {
+            let mut partition_map = std::collections::HashMap::new();
+            partition_map.insert("region".to_string(), region.to_string());
+            let partition_spec = PartitionSpec::new(partition_map);
+            admin
+                .create_partition(&table_path, &partition_spec, false)
+                .await
+                .expect("Failed to create partition");
+        }
+
+        let connection = cluster.get_fluss_connection().await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Insert records with different partitions
+        let test_data = [
+            ("US", 1, "Gustave", 100i64),
+            ("US", 2, "Lune", 200i64),
+            ("EU", 1, "Sciel", 150i64),
+            ("EU", 2, "Maelle", 250i64),
+            ("APAC", 1, "Noco", 300i64),
+        ];
+
+        for (region, user_id, name, score) in &test_data {
+            let mut row = GenericRow::new(4);
+            row.set_field(0, *region);
+            row.set_field(1, *user_id);
+            row.set_field(2, *name);
+            row.set_field(3, *score);
+            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+        }
+
+        // Create lookuper
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        // Lookup records - the lookup key includes partition key columns
+        for (region, user_id, expected_name, expected_score) in &test_data {
+            let mut key = GenericRow::new(4);
+            key.set_field(0, *region);
+            key.set_field(1, *user_id);
+
+            let result = lookuper.lookup(&key).await.expect("Failed to 
lookup");
+            let row = result
+                .get_single_row()
+                .expect("Failed to get row")
+                .expect("Row should exist");
+
+            assert_eq!(row.get_string(0), *region, "region mismatch");
+            assert_eq!(row.get_int(1), *user_id, "user_id mismatch");
+            assert_eq!(row.get_string(2), *expected_name, "name mismatch");
+            assert_eq!(row.get_long(3), *expected_score, "score mismatch");
+        }
+
+        // Test update within a partition
+        let mut updated_row = GenericRow::new(4);
+        updated_row.set_field(0, "US");
+        updated_row.set_field(1, 1);
+        updated_row.set_field(2, "Gustave Updated");
+        updated_row.set_field(3, 999i64);
+        upsert_writer
+            .upsert(&updated_row)
+            .await
+            .expect("Failed to upsert updated row");
+
+        // Verify the update
+        let mut key = GenericRow::new(4);
+        key.set_field(0, "US");
+        key.set_field(1, 1);
+        let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+        let row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+        assert_eq!(row.get_string(2), "Gustave Updated");
+        assert_eq!(row.get_long(3), 999);
+
+        // Lookup in non-existent partition should return empty result
+        let mut non_existent_key = GenericRow::new(4);
+        non_existent_key.set_field(0, "UNKNOWN_REGION");
+        non_existent_key.set_field(1, 1);
+        let result = lookuper
+            .lookup(&non_existent_key)
+            .await
+            .expect("Failed to lookup non-existent partition");
+        assert!(
+            result
+                .get_single_row()
+                .expect("Failed to get row")
+                .is_none(),
+            "Lookup in non-existent partition should return None"
+        );
+
+        // Delete a record within a partition
+        let mut delete_key = GenericRow::new(4);
+        delete_key.set_field(0, "EU");
+        delete_key.set_field(1, 1);
+        upsert_writer
+            .delete(&delete_key)
+            .await
+            .expect("Failed to delete");
+
+        // Verify deletion
+        let mut key = GenericRow::new(4);
+        key.set_field(0, "EU");
+        key.set_field(1, 1);
+        let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+        assert!(
+            result
+                .get_single_row()
+                .expect("Failed to get row")
+                .is_none(),
+            "Deleted record should not exist"
+        );
+
+        // Verify other records in the same partition still exist
+        let mut key = GenericRow::new(4);
+        key.set_field(0, "EU");
+        key.set_field(1, 2);
+        let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+        let row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+        assert_eq!(row.get_string(2), "Maelle");
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+    }
+
     /// Integration test covering put and get operations for all supported 
datatypes.
     #[tokio::test]
     async fn all_supported_datatypes() {

Reply via email to