This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fdd5296  feat: Implement Admin APIs for partitioning (#208)
fdd5296 is described below

commit fdd52968d031ed5b312005686c414596c1adf2d7
Author: Keith Lee <[email protected]>
AuthorDate: Sun Jan 25 01:18:59 2026 +0000

    feat: Implement Admin APIs for partitioning (#208)
---
 crates/fluss/src/client/admin.rs                   |  76 +++-
 crates/fluss/src/client/table/scanner.rs           |   3 +-
 crates/fluss/src/client/write/accumulator.rs       |   2 +-
 crates/fluss/src/client/write/sender.rs            |   3 +-
 crates/fluss/src/lib.rs                            |   4 +-
 crates/fluss/src/metadata/mod.rs                   |   2 +
 crates/fluss/src/metadata/partition.rs             | 469 +++++++++++++++++++++
 crates/fluss/src/metadata/table.rs                 |  66 ++-
 crates/fluss/src/proto/fluss_api.proto             |  36 +-
 crates/fluss/src/rpc/api_key.rs                    |  12 +
 crates/fluss/src/rpc/message/create_partition.rs   |  59 +++
 crates/fluss/src/rpc/message/drop_partition.rs     |  59 +++
 crates/fluss/src/rpc/message/list_offsets.rs       |  10 +-
 .../fluss/src/rpc/message/list_partition_infos.rs  |  63 +++
 crates/fluss/src/rpc/message/mod.rs                |   6 +
 crates/fluss/src/util/mod.rs                       |   3 +-
 crates/fluss/tests/integration/admin.rs            | 145 ++++++-
 17 files changed, 983 insertions(+), 35 deletions(-)

diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 6646f97..bffe0f5 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -17,20 +17,21 @@
 
 use crate::client::metadata::Metadata;
 use crate::metadata::{
-    DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, TableBucket, 
TableDescriptor,
-    TableInfo, TablePath,
+    DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, 
PartitionSpec,
+    TableBucket, TableDescriptor, TableInfo, TablePath,
 };
 use crate::rpc::message::{
-    CreateDatabaseRequest, CreateTableRequest, DatabaseExistsRequest, 
DropDatabaseRequest,
-    DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest, 
GetTableRequest,
-    ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
+    CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, 
DatabaseExistsRequest,
+    DropDatabaseRequest, DropPartitionRequest, DropTableRequest, 
GetDatabaseInfoRequest,
+    GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, 
ListPartitionInfosRequest,
+    ListTablesRequest, TableExistsRequest,
 };
 use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
 use crate::rpc::{RpcClient, ServerConnection};
 
-use crate::BucketId;
 use crate::error::{Error, Result};
 use crate::proto::GetTableInfoResponse;
+use crate::{BucketId, PartitionId, TableId};
 use std::collections::HashMap;
 use std::slice::from_ref;
 use std::sync::Arc;
@@ -138,6 +139,63 @@ impl FlussAdmin {
         Ok(response.table_name)
     }
 
+    /// List all partitions in the given table.
+    pub async fn list_partition_infos(&self, table_path: &TablePath) -> 
Result<Vec<PartitionInfo>> {
+        self.list_partition_infos_with_spec(table_path, None).await
+    }
+
+    /// List partitions in the given table that match the partial partition 
spec.
+    pub async fn list_partition_infos_with_spec(
+        &self,
+        table_path: &TablePath,
+        partial_partition_spec: Option<&PartitionSpec>,
+    ) -> Result<Vec<PartitionInfo>> {
+        let response = self
+            .admin_gateway
+            .request(ListPartitionInfosRequest::new(
+                table_path,
+                partial_partition_spec,
+            ))
+            .await?;
+        Ok(response.get_partitions_info())
+    }
+
+    /// Create a new partition for a partitioned table.
+    pub async fn create_partition(
+        &self,
+        table_path: &TablePath,
+        partition_spec: &PartitionSpec,
+        ignore_if_exists: bool,
+    ) -> Result<()> {
+        let _response = self
+            .admin_gateway
+            .request(CreatePartitionRequest::new(
+                table_path,
+                partition_spec,
+                ignore_if_exists,
+            ))
+            .await?;
+        Ok(())
+    }
+
+    /// Drop a partition from a partitioned table.
+    pub async fn drop_partition(
+        &self,
+        table_path: &TablePath,
+        partition_spec: &PartitionSpec,
+        ignore_if_not_exists: bool,
+    ) -> Result<()> {
+        let _response = self
+            .admin_gateway
+            .request(DropPartitionRequest::new(
+                table_path,
+                partition_spec,
+                ignore_if_not_exists,
+            ))
+            .await?;
+        Ok(())
+    }
+
     /// Check if a table exists
     pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
         let response = self
@@ -263,13 +321,13 @@ impl FlussAdmin {
 
     fn prepare_list_offsets_requests(
         &self,
-        table_id: i64,
-        partition_id: Option<i64>,
+        table_id: TableId,
+        partition_id: Option<PartitionId>,
         buckets: &[BucketId],
         offset_spec: OffsetSpec,
     ) -> Result<HashMap<i32, ListOffsetsRequest>> {
         let cluster = self.metadata.get_cluster();
-        let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
+        let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> = 
HashMap::new();
 
         for bucket_id in buckets {
             let table_bucket = TableBucket::new(table_id, *bucket_id);
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 8712650..61ed56e 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -27,6 +27,7 @@ use std::{
 };
 use tempfile::TempDir;
 
+use crate::TableId;
 use crate::client::connection::FlussConnection;
 use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
@@ -264,7 +265,7 @@ pub struct RecordBatchLogScanner {
 /// Private shared implementation for both scanner types
 struct LogScannerInner {
     table_path: TablePath,
-    table_id: i64,
+    table_id: TableId,
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     log_fetcher: LogFetcher,
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 46c822c..96114fb 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -401,7 +401,7 @@ impl RecordAccumulator {
         ready_write_batch.write_batch.re_enqueued();
         let table_path = ready_write_batch.write_batch.table_path().clone();
         let bucket_id = ready_write_batch.table_bucket.bucket_id();
-        let table_id = 
u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
+        let table_id = ready_write_batch.table_bucket.table_id();
 
         let dq = {
             let mut binding =
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index ceed245..1ffda58 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::TableId;
 use crate::client::broadcast;
 use crate::client::metadata::Metadata;
 use crate::client::write::batch::WriteBatch;
@@ -144,7 +145,7 @@ impl Sender {
             return Ok(());
         }
         let mut records_by_bucket = HashMap::new();
-        let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> = 
HashMap::new();
+        let mut write_batch_by_table: HashMap<TableId, Vec<TableBucket>> = 
HashMap::new();
 
         for batch in batches {
             let table_bucket = batch.table_bucket.clone();
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index e8d822f..f079db2 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -34,8 +34,8 @@ mod util;
 #[cfg(test)]
 mod test_utils;
 
-pub type TableId = u64;
-pub type PartitionId = u64;
+pub type TableId = i64;
+pub type PartitionId = i64;
 pub type BucketId = i32;
 
 pub mod proto {
diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs
index 9c0b1b4..0ca654a 100644
--- a/crates/fluss/src/metadata/mod.rs
+++ b/crates/fluss/src/metadata/mod.rs
@@ -19,10 +19,12 @@ mod data_lake_format;
 mod database;
 mod datatype;
 mod json_serde;
+mod partition;
 mod table;
 
 pub use data_lake_format::*;
 pub use database::*;
 pub use datatype::*;
 pub use json_serde::*;
+pub use partition::*;
 pub use table::*;
diff --git a/crates/fluss/src/metadata/partition.rs 
b/crates/fluss/src/metadata/partition.rs
new file mode 100644
index 0000000..1ecc0dc
--- /dev/null
+++ b/crates/fluss/src/metadata/partition.rs
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PartitionId;
+use crate::error::{Error, Result};
+use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec};
+use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+
+/// Represents a partition spec in fluss. Partition columns and values are NOT 
of strict order, and
+/// they need to be re-arranged to the correct order by comparing with a list 
of strictly ordered
+/// partition keys.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PartitionSpec {
+    partition_spec: HashMap<String, String>,
+}
+
+impl PartitionSpec {
+    pub fn new(partition_spec: HashMap<String, String>) -> Self {
+        Self { partition_spec }
+    }
+
+    pub fn get_spec_map(&self) -> &HashMap<String, String> {
+        &self.partition_spec
+    }
+
+    pub fn to_pb(&self) -> PbPartitionSpec {
+        PbPartitionSpec {
+            partition_key_values: self
+                .partition_spec
+                .iter()
+                .map(|(k, v)| PbKeyValue {
+                    key: k.clone(),
+                    value: v.clone(),
+                })
+                .collect(),
+        }
+    }
+
+    pub fn from_pb(pb: &PbPartitionSpec) -> Self {
+        let partition_spec = pb
+            .partition_key_values
+            .iter()
+            .map(|kv| (kv.key.clone(), kv.value.clone()))
+            .collect();
+        Self { partition_spec }
+    }
+}
+
+impl Display for PartitionSpec {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "PartitionSpec{{{:?}}}", self.partition_spec)
+    }
+}
+
+/// Represents a partition, which is the resolved version of PartitionSpec. 
The partition
+/// spec is re-arranged into the correct order by comparing it with a list of 
strictly ordered
+/// partition keys.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ResolvedPartitionSpec {
+    partition_keys: Vec<String>,
+    partition_values: Vec<String>,
+}
+
+pub const PARTITION_SPEC_SEPARATOR: &str = "$";
+
+impl ResolvedPartitionSpec {
+    pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) -> 
Result<Self> {
+        if partition_keys.len() != partition_values.len() {
+            return Err(Error::IllegalArgument {
+                message: "The number of partition keys and partition values 
should be the same."
+                    .to_string(),
+            });
+        }
+        Ok(Self {
+            partition_keys,
+            partition_values,
+        })
+    }
+
+    pub fn from_partition_spec(
+        partition_keys: Vec<String>,
+        partition_spec: &PartitionSpec,
+    ) -> Self {
+        let partition_values =
+            Self::get_reordered_partition_values(&partition_keys, 
partition_spec);
+        Self {
+            partition_keys,
+            partition_values,
+        }
+    }
+
+    pub fn from_partition_value(partition_key: String, partition_value: 
String) -> Self {
+        Self {
+            partition_keys: vec![partition_key],
+            partition_values: vec![partition_value],
+        }
+    }
+
+    pub fn from_partition_name(partition_keys: Vec<String>, partition_name: 
&str) -> Self {
+        let partition_values: Vec<String> = partition_name
+            .split(PARTITION_SPEC_SEPARATOR)
+            .map(|s| s.to_string())
+            .collect();
+        Self {
+            partition_keys,
+            partition_values,
+        }
+    }
+
+    pub fn from_partition_qualified_name(qualified_partition_name: &str) -> 
Result<Self> {
+        let mut keys = Vec::new();
+        let mut values = Vec::new();
+
+        for pair in qualified_partition_name.split('/') {
+            let parts: Vec<&str> = pair.splitn(2, '=').collect();
+            if parts.len() != 2 {
+                return Err(Error::IllegalArgument {
+                    message: format!(
+                        "Invalid partition name format. Expected key=value, 
got: {}",
+                        pair
+                    ),
+                });
+            }
+            keys.push(parts[0].to_string());
+            values.push(parts[1].to_string());
+        }
+
+        Ok(Self {
+            partition_keys: keys,
+            partition_values: values,
+        })
+    }
+
+    pub fn get_partition_keys(&self) -> &[String] {
+        &self.partition_keys
+    }
+
+    pub fn get_partition_values(&self) -> &[String] {
+        &self.partition_values
+    }
+
+    pub fn to_partition_spec(&self) -> PartitionSpec {
+        let mut spec_map = HashMap::new();
+        for (i, key) in self.partition_keys.iter().enumerate() {
+            spec_map.insert(key.clone(), self.partition_values[i].clone());
+        }
+        PartitionSpec::new(spec_map)
+    }
+
+    /// Generate the partition name for a partition table with specified 
partition values.
+    ///
+    /// The partition name is in the following format: value1$value2$...$valueN
+    pub fn get_partition_name(&self) -> String {
+        self.partition_values.join(PARTITION_SPEC_SEPARATOR)
+    }
+
+    /// Returns the qualified partition name for a partition spec.
+    /// The format is: key1=value1/key2=value2/.../keyN=valueN
+    pub fn get_partition_qualified_name(&self) -> String {
+        let mut sb = String::new();
+        for (i, key) in self.partition_keys.iter().enumerate() {
+            sb.push_str(key);
+            sb.push('=');
+            sb.push_str(&self.partition_values[i]);
+            if i != self.partition_keys.len() - 1 {
+                sb.push('/');
+            }
+        }
+        sb
+    }
+
+    pub fn contains(&self, other: &ResolvedPartitionSpec) -> Result<bool> {
+        let other_partition_keys = other.get_partition_keys();
+        let other_partition_values = other.get_partition_values();
+
+        let mut expected_partition_values = Vec::new();
+        for other_partition_key in other_partition_keys {
+            let key_index = self
+                .partition_keys
+                .iter()
+                .position(|k| k == other_partition_key);
+            match key_index {
+                Some(idx) => 
expected_partition_values.push(self.partition_values[idx].clone()),
+                None => {
+                    return Err(Error::IllegalArgument {
+                        message: format!(
+                            "table does not contain partitionKey: {}",
+                            other_partition_key
+                        ),
+                    });
+                }
+            }
+        }
+
+        let expected_partition_name = 
expected_partition_values.join(PARTITION_SPEC_SEPARATOR);
+        let other_partition_name = 
other_partition_values.join(PARTITION_SPEC_SEPARATOR);
+
+        Ok(expected_partition_name == other_partition_name)
+    }
+
+    pub fn to_pb(&self) -> PbPartitionSpec {
+        PbPartitionSpec {
+            partition_key_values: self
+                .partition_keys
+                .iter()
+                .zip(self.partition_values.iter())
+                .map(|(k, v)| PbKeyValue {
+                    key: k.clone(),
+                    value: v.clone(),
+                })
+                .collect(),
+        }
+    }
+
+    pub fn from_pb(pb: &PbPartitionSpec) -> Self {
+        let partition_keys = pb
+            .partition_key_values
+            .iter()
+            .map(|kv| kv.key.clone())
+            .collect();
+        let partition_values = pb
+            .partition_key_values
+            .iter()
+            .map(|kv| kv.value.clone())
+            .collect();
+        Self {
+            partition_keys,
+            partition_values,
+        }
+    }
+
+    fn get_reordered_partition_values(
+        partition_keys: &[String],
+        partition_spec: &PartitionSpec,
+    ) -> Vec<String> {
+        let partition_spec_map = partition_spec.get_spec_map();
+        partition_keys
+            .iter()
+            .map(|key| 
partition_spec_map.get(key).cloned().unwrap_or_default())
+            .collect()
+    }
+}
+
+impl Display for ResolvedPartitionSpec {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.get_partition_qualified_name())
+    }
+}
+
+/// Information of a partition metadata, includes the partition's name and the 
partition id that
+/// represents the unique identifier of the partition.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PartitionInfo {
+    partition_id: PartitionId,
+    partition_spec: ResolvedPartitionSpec,
+}
+
+impl PartitionInfo {
+    pub fn new(partition_id: PartitionId, partition_spec: 
ResolvedPartitionSpec) -> Self {
+        Self {
+            partition_id,
+            partition_spec,
+        }
+    }
+
+    /// Get the partition id. The id is globally unique in the Fluss cluster.
+    pub fn get_partition_id(&self) -> PartitionId {
+        self.partition_id
+    }
+
+    /// Get the partition name.
+    pub fn get_partition_name(&self) -> String {
+        self.partition_spec.get_partition_name()
+    }
+
+    pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec {
+        &self.partition_spec
+    }
+
+    pub fn get_partition_spec(&self) -> PartitionSpec {
+        self.partition_spec.to_partition_spec()
+    }
+
+    pub fn to_pb(&self) -> PbPartitionInfo {
+        PbPartitionInfo {
+            partition_id: self.partition_id,
+            partition_spec: self.partition_spec.to_pb(),
+        }
+    }
+
+    pub fn from_pb(pb: &PbPartitionInfo) -> Self {
+        Self {
+            partition_id: pb.partition_id,
+            partition_spec: ResolvedPartitionSpec::from_pb(&pb.partition_spec),
+        }
+    }
+}
+
+impl Display for PartitionInfo {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "Partition{{name='{}', id={}}}",
+            self.get_partition_name(),
+            self.partition_id
+        )
+    }
+}
+
+/// A class to identify a table partition, containing the table id and the 
partition id.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct TablePartition {
+    table_id: i64,
+    partition_id: PartitionId,
+}
+
+impl TablePartition {
+    pub fn new(table_id: i64, partition_id: PartitionId) -> Self {
+        Self {
+            table_id,
+            partition_id,
+        }
+    }
+
+    pub fn get_table_id(&self) -> i64 {
+        self.table_id
+    }
+
+    pub fn get_partition_id(&self) -> PartitionId {
+        self.partition_id
+    }
+}
+
+impl Display for TablePartition {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "TablePartition{{tableId={}, partitionId={}}}",
+            self.table_id, self.partition_id
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_resolved_partition_spec_name() {
+        let spec = ResolvedPartitionSpec::new(
+            vec!["date".to_string(), "region".to_string()],
+            vec!["2024-01-15".to_string(), "US".to_string()],
+        )
+        .unwrap();
+
+        assert_eq!(spec.get_partition_name(), "2024-01-15$US");
+        assert_eq!(
+            spec.get_partition_qualified_name(),
+            "date=2024-01-15/region=US"
+        );
+    }
+
+    #[test]
+    fn test_resolved_partition_spec_from_partition_name() {
+        let spec = ResolvedPartitionSpec::from_partition_name(
+            vec!["date".to_string(), "region".to_string()],
+            "2024-01-15$US",
+        );
+
+        assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
+    }
+
+    #[test]
+    fn test_resolved_partition_spec_from_qualified_name() {
+        let spec =
+            
ResolvedPartitionSpec::from_partition_qualified_name("date=2024-01-15/region=US")
+                .unwrap();
+
+        assert_eq!(spec.get_partition_keys(), &["date", "region"]);
+        assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
+    }
+
+    #[test]
+    fn test_resolved_partition_spec_mismatched_lengths() {
+        let result = ResolvedPartitionSpec::new(
+            vec!["date".to_string(), "region".to_string()],
+            vec!["2024-01-15".to_string()],
+        );
+
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_partition_info() {
+        let spec =
+            ResolvedPartitionSpec::new(vec!["date".to_string()], 
vec!["2024-01-15".to_string()])
+                .unwrap();
+
+        let info = PartitionInfo::new(42, spec);
+        assert_eq!(info.get_partition_id(), 42);
+        assert_eq!(info.get_partition_name(), "2024-01-15");
+    }
+
+    #[test]
+    fn test_table_partition() {
+        let tp = TablePartition::new(100, 42);
+        assert_eq!(tp.get_table_id(), 100);
+        assert_eq!(tp.get_partition_id(), 42);
+    }
+
+    #[test]
+    fn test_partition_spec_pb_roundtrip() {
+        let mut map = HashMap::new();
+        map.insert("date".to_string(), "2024-01-15".to_string());
+        let spec = PartitionSpec::new(map);
+
+        let pb = spec.to_pb();
+        let restored = PartitionSpec::from_pb(&pb);
+
+        assert_eq!(
+            spec.get_spec_map().get("date"),
+            restored.get_spec_map().get("date")
+        );
+    }
+
+    #[test]
+    fn test_partition_info_pb_roundtrip() {
+        let spec =
+            ResolvedPartitionSpec::new(vec!["date".to_string()], 
vec!["2024-01-15".to_string()])
+                .unwrap();
+        let info = PartitionInfo::new(42, spec);
+
+        let pb = info.to_pb();
+        let restored = PartitionInfo::from_pb(&pb);
+
+        assert_eq!(info.get_partition_id(), restored.get_partition_id());
+        assert_eq!(info.get_partition_name(), restored.get_partition_name());
+    }
+
+    #[test]
+    fn test_contains() {
+        let full_spec = ResolvedPartitionSpec::new(
+            vec!["date".to_string(), "region".to_string()],
+            vec!["2024-01-15".to_string(), "US".to_string()],
+        )
+        .unwrap();
+
+        let partial_spec =
+            ResolvedPartitionSpec::new(vec!["date".to_string()], 
vec!["2024-01-15".to_string()])
+                .unwrap();
+
+        assert!(full_spec.contains(&partial_spec).unwrap());
+    }
+}
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index f4cf972..c4a9195 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -20,6 +20,7 @@ use crate::error::Error::{IllegalArgument, InvalidTableError};
 use crate::error::{Error, Result};
 use crate::metadata::DataLakeFormat;
 use crate::metadata::datatype::{DataField, DataType, RowType};
+use crate::{BucketId, PartitionId, TableId};
 use core::fmt;
 use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
@@ -697,32 +698,71 @@ impl TablePath {
     }
 }
 
+/// A database name, table name and partition name combo. It's used to 
represent the physical path of
+/// a bucket. If the bucket belongs to a partition (i.e., the table is a 
partitioned table),
+/// `partition_name` will be `Some(...)`; otherwise, it will be `None`.
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct PhysicalTablePath {
     table_path: TablePath,
-    #[allow(dead_code)]
-    partition: Option<String>,
+    partition_name: Option<String>,
 }
 
 impl PhysicalTablePath {
     pub fn of(table_path: TablePath) -> Self {
         Self {
             table_path,
-            partition: None,
+            partition_name: None,
         }
     }
 
-    // TODO: support partition
+    pub fn of_partitioned(table_path: TablePath, partition_name: 
Option<String>) -> Self {
+        Self {
+            table_path,
+            partition_name,
+        }
+    }
+
+    pub fn of_with_names(
+        database_name: String,
+        table_name: String,
+        partition_name: Option<String>,
+    ) -> Self {
+        Self {
+            table_path: TablePath::new(database_name, table_name),
+            partition_name,
+        }
+    }
 
     pub fn get_table_path(&self) -> &TablePath {
         &self.table_path
     }
+
+    pub fn get_database_name(&self) -> &str {
+        self.table_path.database()
+    }
+
+    pub fn get_table_name(&self) -> &str {
+        self.table_path.table()
+    }
+
+    pub fn get_partition_name(&self) -> Option<&String> {
+        self.partition_name.as_ref()
+    }
+}
+
+impl Display for PhysicalTablePath {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match &self.partition_name {
+            Some(partition) => write!(f, "{}(p={})", self.table_path, 
partition),
+            None => write!(f, "{}", self.table_path),
+        }
+    }
 }
 
 #[derive(Debug, Clone)]
 pub struct TableInfo {
     pub table_path: TablePath,
-    pub table_id: i64,
+    pub table_id: TableId,
     pub schema_id: i32,
     pub schema: Schema,
     pub row_type: RowType,
@@ -819,7 +859,7 @@ impl TableInfo {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         table_path: TablePath,
-        table_id: i64,
+        table_id: TableId,
         schema_id: i32,
         schema: Schema,
         bucket_keys: Vec<String>,
@@ -1000,13 +1040,13 @@ impl Display for TableInfo {
 
 #[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
 pub struct TableBucket {
-    table_id: i64,
-    partition_id: Option<i64>,
-    bucket: i32,
+    table_id: TableId,
+    partition_id: Option<PartitionId>,
+    bucket: BucketId,
 }
 
 impl TableBucket {
-    pub fn new(table_id: i64, bucket: i32) -> Self {
+    pub fn new(table_id: TableId, bucket: BucketId) -> Self {
         TableBucket {
             table_id,
             partition_id: None,
@@ -1014,15 +1054,15 @@ impl TableBucket {
         }
     }
 
-    pub fn table_id(&self) -> i64 {
+    pub fn table_id(&self) -> TableId {
         self.table_id
     }
 
-    pub fn bucket_id(&self) -> i32 {
+    pub fn bucket_id(&self) -> BucketId {
         self.bucket
     }
 
-    pub fn partition_id(&self) -> Option<i64> {
+    pub fn partition_id(&self) -> Option<PartitionId> {
         self.partition_id
     }
 }
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index 65eddce..eca4cf3 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -374,4 +374,38 @@ message PbLookupRespForBucket {
 
 message PbValue {
   optional bytes values = 1;
-}
\ No newline at end of file
+}
+
+message PbPartitionSpec {
+  repeated PbKeyValue partition_key_values = 1;
+}
+
+message PbPartitionInfo {
+  required int64 partition_id = 1;
+  required PbPartitionSpec partition_spec = 2;
+}
+
+message ListPartitionInfosRequest {
+  required PbTablePath table_path = 1;
+  optional PbPartitionSpec partial_partition_spec = 2;
+}
+
+message ListPartitionInfosResponse {
+  repeated PbPartitionInfo partitions_info = 1;
+}
+
+message CreatePartitionRequest {
+  required PbTablePath table_path = 1;
+  required PbPartitionSpec partition_spec = 2;
+  required bool ignore_if_exists = 3;
+}
+
+message CreatePartitionResponse {}
+
+message DropPartitionRequest {
+  required PbTablePath table_path = 1;
+  required PbPartitionSpec partition_spec = 2;
+  required bool ignore_if_not_exists = 3;
+}
+
+message DropPartitionResponse {}
\ No newline at end of file
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 66e4beb..f6009c0 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -27,6 +27,7 @@ pub enum ApiKey {
     DropTable,
     GetTable,
     ListTables,
+    ListPartitionInfos,
     TableExists,
     MetaData,
     ProduceLog,
@@ -37,6 +38,8 @@ pub enum ApiKey {
     GetFileSystemSecurityToken,
     GetDatabaseInfo,
     GetLatestLakeSnapshot,
+    CreatePartition,
+    DropPartition,
     Unknown(i16),
 }
 
@@ -51,6 +54,7 @@ impl From<i16> for ApiKey {
             1006 => ApiKey::DropTable,
             1007 => ApiKey::GetTable,
             1008 => ApiKey::ListTables,
+            1009 => ApiKey::ListPartitionInfos,
             1010 => ApiKey::TableExists,
             1012 => ApiKey::MetaData,
             1014 => ApiKey::ProduceLog,
@@ -61,6 +65,8 @@ impl From<i16> for ApiKey {
             1025 => ApiKey::GetFileSystemSecurityToken,
             1032 => ApiKey::GetLatestLakeSnapshot,
             1035 => ApiKey::GetDatabaseInfo,
+            1036 => ApiKey::CreatePartition,
+            1037 => ApiKey::DropPartition,
             _ => Unknown(key),
         }
     }
@@ -77,6 +83,7 @@ impl From<ApiKey> for i16 {
             ApiKey::DropTable => 1006,
             ApiKey::GetTable => 1007,
             ApiKey::ListTables => 1008,
+            ApiKey::ListPartitionInfos => 1009,
             ApiKey::TableExists => 1010,
             ApiKey::MetaData => 1012,
             ApiKey::ProduceLog => 1014,
@@ -87,6 +94,8 @@ impl From<ApiKey> for i16 {
             ApiKey::GetFileSystemSecurityToken => 1025,
             ApiKey::GetLatestLakeSnapshot => 1032,
             ApiKey::GetDatabaseInfo => 1035,
+            ApiKey::CreatePartition => 1036,
+            ApiKey::DropPartition => 1037,
             Unknown(x) => x,
         }
     }
@@ -107,6 +116,7 @@ mod tests {
             (1006, ApiKey::DropTable),
             (1007, ApiKey::GetTable),
             (1008, ApiKey::ListTables),
+            (1009, ApiKey::ListPartitionInfos),
             (1010, ApiKey::TableExists),
             (1012, ApiKey::MetaData),
             (1014, ApiKey::ProduceLog),
@@ -117,6 +127,8 @@ mod tests {
             (1025, ApiKey::GetFileSystemSecurityToken),
             (1032, ApiKey::GetLatestLakeSnapshot),
             (1035, ApiKey::GetDatabaseInfo),
+            (1036, ApiKey::CreatePartition),
+            (1037, ApiKey::DropPartition),
         ];
 
         for (raw, key) in cases {
diff --git a/crates/fluss/src/rpc/message/create_partition.rs 
b/crates/fluss/src/rpc/message/create_partition.rs
new file mode 100644
index 0000000..93dbf70
--- /dev/null
+++ b/crates/fluss/src/rpc/message/create_partition.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionSpec, TablePath};
+use crate::proto::CreatePartitionResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct CreatePartitionRequest {
+    pub inner_request: proto::CreatePartitionRequest,
+}
+
+impl CreatePartitionRequest {
+    pub fn new(
+        table_path: &TablePath,
+        partition_spec: &PartitionSpec,
+        ignore_if_exists: bool,
+    ) -> Self {
+        CreatePartitionRequest {
+            inner_request: proto::CreatePartitionRequest {
+                table_path: to_table_path(table_path),
+                partition_spec: partition_spec.to_pb(),
+                ignore_if_exists,
+            },
+        }
+    }
+}
+
+impl RequestBody for CreatePartitionRequest {
+    type ResponseBody = CreatePartitionResponse;
+
+    const API_KEY: ApiKey = ApiKey::CreatePartition;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(CreatePartitionRequest);
+impl_read_version_type!(CreatePartitionResponse);
diff --git a/crates/fluss/src/rpc/message/drop_partition.rs 
b/crates/fluss/src/rpc/message/drop_partition.rs
new file mode 100644
index 0000000..ddc97d8
--- /dev/null
+++ b/crates/fluss/src/rpc/message/drop_partition.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionSpec, TablePath};
+use crate::proto::DropPartitionResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DropPartitionRequest {
+    pub inner_request: proto::DropPartitionRequest,
+}
+
+impl DropPartitionRequest {
+    pub fn new(
+        table_path: &TablePath,
+        partition_spec: &PartitionSpec,
+        ignore_if_not_exists: bool,
+    ) -> Self {
+        DropPartitionRequest {
+            inner_request: proto::DropPartitionRequest {
+                table_path: to_table_path(table_path),
+                partition_spec: partition_spec.to_pb(),
+                ignore_if_not_exists,
+            },
+        }
+    }
+}
+
+impl RequestBody for DropPartitionRequest {
+    type ResponseBody = DropPartitionResponse;
+
+    const API_KEY: ApiKey = ApiKey::DropPartition;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DropPartitionRequest);
+impl_read_version_type!(DropPartitionResponse);
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs 
b/crates/fluss/src/rpc/message/list_offsets.rs
index fcecb41..262645a 100644
--- a/crates/fluss/src/rpc/message/list_offsets.rs
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{impl_read_version_type, impl_write_version_type, proto};
+use crate::{
+    BucketId, PartitionId, TableId, impl_read_version_type, 
impl_write_version_type, proto,
+};
 
 use crate::error::Result as FlussResult;
 use crate::error::{Error, FlussError};
@@ -74,9 +76,9 @@ pub struct ListOffsetsRequest {
 
 impl ListOffsetsRequest {
     pub fn new(
-        table_id: i64,
-        partition_id: Option<i64>,
-        bucket_ids: Vec<i32>,
+        table_id: TableId,
+        partition_id: Option<PartitionId>,
+        bucket_ids: Vec<BucketId>,
         offset_spec: OffsetSpec,
     ) -> Self {
         ListOffsetsRequest {
diff --git a/crates/fluss/src/rpc/message/list_partition_infos.rs 
b/crates/fluss/src/rpc/message/list_partition_infos.rs
new file mode 100644
index 0000000..ab69367
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_partition_infos.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionInfo, PartitionSpec, TablePath};
+use crate::proto::ListPartitionInfosResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct ListPartitionInfosRequest {
+    pub inner_request: proto::ListPartitionInfosRequest,
+}
+
+impl ListPartitionInfosRequest {
+    pub fn new(table_path: &TablePath, partial_partition_spec: 
Option<&PartitionSpec>) -> Self {
+        ListPartitionInfosRequest {
+            inner_request: proto::ListPartitionInfosRequest {
+                table_path: to_table_path(table_path),
+                partial_partition_spec: partial_partition_spec.map(|s| 
s.to_pb()),
+            },
+        }
+    }
+}
+
+impl RequestBody for ListPartitionInfosRequest {
+    type ResponseBody = ListPartitionInfosResponse;
+
+    const API_KEY: ApiKey = ApiKey::ListPartitionInfos;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListPartitionInfosRequest);
+impl_read_version_type!(ListPartitionInfosResponse);
+
+impl ListPartitionInfosResponse {
+    pub fn get_partitions_info(&self) -> Vec<PartitionInfo> {
+        self.partitions_info
+            .iter()
+            .map(PartitionInfo::from_pb)
+            .collect()
+    }
+}
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 881a64f..addb97a 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -21,9 +21,11 @@ use crate::rpc::frame::{ReadError, WriteError};
 use bytes::{Buf, BufMut};
 
 mod create_database;
+mod create_partition;
 mod create_table;
 mod database_exists;
 mod drop_database;
+mod drop_partition;
 mod drop_table;
 mod fetch;
 mod get_database_info;
@@ -33,6 +35,7 @@ mod get_table;
 mod header;
 mod list_databases;
 mod list_offsets;
+mod list_partition_infos;
 mod list_tables;
 mod lookup;
 mod produce_log;
@@ -42,9 +45,11 @@ mod update_metadata;
 
 pub use crate::rpc::RpcError;
 pub use create_database::*;
+pub use create_partition::*;
 pub use create_table::*;
 pub use database_exists::*;
 pub use drop_database::*;
+pub use drop_partition::*;
 pub use drop_table::*;
 pub use fetch::*;
 pub use get_database_info::*;
@@ -54,6 +59,7 @@ pub use get_table::*;
 pub use header::*;
 pub use list_databases::*;
 pub use list_offsets::*;
+pub use list_partition_infos::*;
 pub use list_tables::*;
 pub use lookup::*;
 pub use produce_log::*;
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 156ef04..3760487 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -18,6 +18,7 @@
 pub mod murmur_hash;
 pub mod varint;
 
+use crate::TableId;
 use crate::metadata::TableBucket;
 use linked_hash_map::LinkedHashMap;
 use std::collections::{HashMap, HashSet};
@@ -151,7 +152,7 @@ impl<S> FairBucketStatusMap<S> {
         self.map.clear();
 
         // Group buckets by table ID
-        let mut table_to_buckets: LinkedHashMap<i64, Vec<TableBucket>> = 
LinkedHashMap::new();
+        let mut table_to_buckets: LinkedHashMap<TableId, Vec<TableBucket>> = 
LinkedHashMap::new();
         for bucket in bucket_to_status.keys() {
             table_to_buckets
                 .entry(bucket.table_id())
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index fbdb295..9842a5a 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -37,9 +37,10 @@ mod admin_test {
     use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
     use fluss::error::FlussError;
     use fluss::metadata::{
-        DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, 
TableDescriptor,
-        TablePath,
+        DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, 
PartitionSpec, Schema,
+        TableDescriptor, TablePath,
     };
+    use std::collections::HashMap;
     use std::sync::Arc;
 
     fn before_all() {
@@ -223,6 +224,146 @@ mod admin_test {
         assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
     }
 
+    #[tokio::test]
+    async fn test_partition_apis() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+        let admin = connection
+            .get_admin()
+            .await
+            .expect("Failed to get admin client");
+
+        let test_db_name = "test_partition_apis_db";
+        let db_descriptor = DatabaseDescriptorBuilder::default()
+            .comment("Database for test_partition_apis")
+            .build();
+
+        admin
+            .create_database(test_db_name, true, Some(&db_descriptor))
+            .await
+            .expect("Failed to create test database");
+
+        let test_table_name = "partitioned_table";
+        let table_path = TablePath::new(test_db_name.to_string(), 
test_table_name.to_string());
+
+        let table_schema = Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("dt", DataTypes::string())
+            .column("region", DataTypes::string())
+            .primary_key(vec![
+                "id".to_string(),
+                "dt".to_string(),
+                "region".to_string(),
+            ])
+            .build()
+            .expect("Failed to build table schema");
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(table_schema)
+            .distributed_by(Some(3), vec!["id".to_string()])
+            .partitioned_by(vec!["dt".to_string(), "region".to_string()])
+            .property("table.replication.factor", "1")
+            .log_format(LogFormat::ARROW)
+            .kv_format(KvFormat::COMPACTED)
+            .build()
+            .expect("Failed to build table descriptor");
+
+        admin
+            .create_table(&table_path, &table_descriptor, true)
+            .await
+            .expect("Failed to create partitioned table");
+
+        let partitions = admin
+            .list_partition_infos(&table_path)
+            .await
+            .expect("Failed to list partitions");
+        assert!(
+            partitions.is_empty(),
+            "Expected no partitions initially, found {}",
+            partitions.len()
+        );
+
+        let mut partition_values = HashMap::new();
+        partition_values.insert("dt".to_string(), "2024-01-15".to_string());
+        partition_values.insert("region".to_string(), "EMEA".to_string());
+        let partition_spec = PartitionSpec::new(partition_values);
+
+        admin
+            .create_partition(&table_path, &partition_spec, false)
+            .await
+            .expect("Failed to create partition");
+
+        let partitions = admin
+            .list_partition_infos(&table_path)
+            .await
+            .expect("Failed to list partitions");
+        assert_eq!(
+            partitions.len(),
+            1,
+            "Expected exactly one partition after creation"
+        );
+        assert_eq!(
+            partitions[0].get_partition_name(),
+            "2024-01-15$EMEA",
+            "Partition name mismatch"
+        );
+
+        // list with partial spec filter - should find the partition
+        let mut partition_values = HashMap::new();
+        partition_values.insert("dt".to_string(), "2024-01-15".to_string());
+        let partial_partition_spec = PartitionSpec::new(partition_values);
+
+        let partitions_with_spec = admin
+            .list_partition_infos_with_spec(&table_path, 
Some(&partial_partition_spec))
+            .await
+            .expect("Failed to list partitions with spec");
+        assert_eq!(
+            partitions_with_spec.len(),
+            1,
+            "Expected one partition matching the spec"
+        );
+        assert_eq!(
+            partitions_with_spec[0].get_partition_name(),
+            "2024-01-15$EMEA",
+            "Partition name mismatch with spec filter"
+        );
+
+        // list with non-matching spec - should find no partitions
+        let mut non_matching_values = HashMap::new();
+        non_matching_values.insert("dt".to_string(), "2024-01-16".to_string());
+        let non_matching_spec = PartitionSpec::new(non_matching_values);
+        let partitions_non_matching = admin
+            .list_partition_infos_with_spec(&table_path, 
Some(&non_matching_spec))
+            .await
+            .expect("Failed to list partitions with non-matching spec");
+        assert!(
+            partitions_non_matching.is_empty(),
+            "Expected no partitions for non-matching spec"
+        );
+
+        admin
+            .drop_partition(&table_path, &partition_spec, false)
+            .await
+            .expect("Failed to drop partition");
+
+        let partitions = admin
+            .list_partition_infos(&table_path)
+            .await
+            .expect("Failed to list partitions");
+        assert!(
+            partitions.is_empty(),
+            "Expected no partitions after drop, found {}",
+            partitions.len()
+        );
+
+        admin
+            .drop_table(&table_path, true)
+            .await
+            .expect("Failed to drop table");
+        admin.drop_database(test_db_name, true, true).await;
+    }
+
     #[tokio::test]
     async fn test_fluss_error_response() {
         let cluster = get_fluss_cluster();


Reply via email to