This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 884ef56  feat: implement REST API database and table CRUD operations 
with DLF authentication (#147)
884ef56 is described below

commit 884ef56b857960d691577ca2580e3702cfed671c
Author: umi <[email protected]>
AuthorDate: Sun Mar 29 09:51:25 2026 +0800

    feat: implement REST API database and table CRUD operations with DLF 
authentication (#147)
    
    - Add full REST API support for database and table operations
    - Implement DLF (Alibaba Cloud Data Lake Formation) authentication provider
    - Add bearer token authentication support
    - Refactor mock server for better testing
    - Add comprehensive test coverage for REST API operations
    - Fix clippy warnings and apply code formatting
    
    Database operations:
    - list_databases, create_database, get_database, alter_database, 
drop_database
    
    Table operations:
    - list_tables, create_table, get_table, rename_table, drop_table
    
    Authentication:
    - DLF authentication with signature generation
    - Bearer token authentication
    - Configurable token provider factory
---
 crates/paimon/Cargo.toml                           |  10 +
 .../paimon/examples/rest_list_databases_example.rs |  67 ---
 crates/paimon/src/api/api_request.rs               | 134 +++++
 crates/paimon/src/api/api_response.rs              | 184 +++++-
 crates/paimon/src/api/auth/base.rs                 |  22 +-
 .../auth/{bear_provider.rs => bearer_provider.rs}  |  33 +-
 crates/paimon/src/api/auth/dlf_provider.rs         | 474 +++++++++++++++
 crates/paimon/src/api/auth/dlf_signer.rs           | 650 +++++++++++++++++++++
 crates/paimon/src/api/auth/factory.rs              | 164 +++++-
 crates/paimon/src/api/auth/mod.rs                  |   9 +-
 crates/paimon/src/api/mod.rs                       |   9 +-
 crates/paimon/src/api/resource_paths.rs            |  82 +++
 crates/paimon/src/api/rest_api.rs                  | 210 ++++++-
 crates/paimon/src/api/rest_client.rs               | 115 +++-
 crates/paimon/src/common/options.rs                |  29 +
 crates/paimon/tests/mock_server.rs                 | 579 +++++++++++++++++-
 crates/paimon/tests/rest_api_test.rs               | 405 ++++++++++++-
 17 files changed, 3001 insertions(+), 175 deletions(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index d2768b4..8ea6f9e 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -59,6 +59,16 @@ futures = "0.3"
 parquet = { workspace = true, features = ["async", "zstd"] }
 async-stream = "0.3.6"
 reqwest = { version = "0.12", features = ["json"] }
+# DLF authentication dependencies
+base64 = "0.22"
+hex = "0.4"
+hmac = "0.12"
+sha1 = "0.10"
+sha2 = "0.10"
+md-5 = "0.10"
+regex = "1"
+uuid = { version = "1", features = ["v4"] }
+urlencoding = "2.1"
 
 [dev-dependencies]
 axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
diff --git a/crates/paimon/examples/rest_list_databases_example.rs 
b/crates/paimon/examples/rest_list_databases_example.rs
deleted file mode 100644
index 2a5e435..0000000
--- a/crates/paimon/examples/rest_list_databases_example.rs
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Example: List databases using RESTApi
-//!
-//! This example demonstrates how to create a RESTApi instance
-//! and call the list_databases() API to retrieve all databases.
-//!
-//! # Usage
-//! ```bash
-//! cargo run -p paimon --example list_databases_example
-//! ```
-
-use paimon::api::rest_api::RESTApi;
-use paimon::common::{CatalogOptions, Options};
-
-#[tokio::main]
-async fn main() {
-    // Create configuration options
-    let mut options = Options::new();
-
-    // Basic configuration - replace with your actual server URL
-    options.set(CatalogOptions::METASTORE, "rest");
-    options.set(CatalogOptions::WAREHOUSE, "your_warehouse");
-    options.set(CatalogOptions::URI, "http://localhost:8080/";);
-
-    // Bearer token authentication (optional)
-    // options.set(CatalogOptions::TOKEN_PROVIDER, "bear");
-    // options.set(CatalogOptions::TOKEN, "your_token");
-
-    // Create RESTApi instance
-    // config_required = true means it will fetch config from server
-    println!("Creating RESTApi instance...");
-    let api = match RESTApi::new(options, true).await {
-        Ok(api) => api,
-        Err(e) => {
-            eprintln!("Failed to create RESTApi: {e}");
-            return;
-        }
-    };
-
-    // Call list_databases() API
-    println!("Calling list_databases()...");
-    match api.list_databases().await {
-        Ok(databases) => {
-            println!("Databases found: {databases:?}");
-            println!("Total count: {}", databases.len());
-        }
-        Err(e) => {
-            eprintln!("Failed to list databases: {e}");
-        }
-    }
-}
diff --git a/crates/paimon/src/api/api_request.rs 
b/crates/paimon/src/api/api_request.rs
new file mode 100644
index 0000000..33a52fd
--- /dev/null
+++ b/crates/paimon/src/api/api_request.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST API request types for Paimon.
+//!
+//! This module contains all request structures used in REST API calls.
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+use crate::{catalog::Identifier, spec::Schema};
+
+/// Request to create a new database.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateDatabaseRequest {
+    /// The name of the database to create.
+    pub name: String,
+    /// Optional configuration options for the database.
+    pub options: HashMap<String, String>,
+}
+
+impl CreateDatabaseRequest {
+    /// Create a new CreateDatabaseRequest.
+    pub fn new(name: String, options: HashMap<String, String>) -> Self {
+        Self { name, options }
+    }
+}
+
+/// Request to alter a database's configuration.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct AlterDatabaseRequest {
+    /// Keys to remove from the database options.
+    pub removals: Vec<String>,
+    /// Key-value pairs to update in the database options.
+    pub updates: HashMap<String, String>,
+}
+
+impl AlterDatabaseRequest {
+    /// Create a new AlterDatabaseRequest.
+    pub fn new(removals: Vec<String>, updates: HashMap<String, String>) -> 
Self {
+        Self { removals, updates }
+    }
+}
+
+/// Request to rename a table.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct RenameTableRequest {
+    /// The source table identifier.
+    pub source: Identifier,
+    /// The destination table identifier.
+    pub destination: Identifier,
+}
+
+impl RenameTableRequest {
+    /// Create a new RenameTableRequest.
+    pub fn new(source: Identifier, destination: Identifier) -> Self {
+        Self {
+            source,
+            destination,
+        }
+    }
+}
+
+/// Request to create a new table.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateTableRequest {
+    /// The identifier for the table to create.
+    pub identifier: Identifier,
+    /// The schema definition for the table.
+    pub schema: Schema,
+}
+
+impl CreateTableRequest {
+    /// Create a new CreateTableRequest.
+    pub fn new(identifier: Identifier, schema: Schema) -> Self {
+        Self { identifier, schema }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_create_database_request_serialization() {
+        let mut options = HashMap::new();
+        options.insert("key".to_string(), "value".to_string());
+        let req = CreateDatabaseRequest::new("test_db".to_string(), options);
+
+        let json = serde_json::to_string(&req).unwrap();
+        assert!(json.contains("\"name\":\"test_db\""));
+        assert!(json.contains("\"options\""));
+    }
+
+    #[test]
+    fn test_alter_database_request_serialization() {
+        let mut updates = HashMap::new();
+        updates.insert("key".to_string(), "new_value".to_string());
+        let req = AlterDatabaseRequest::new(vec!["old_key".to_string()], 
updates);
+
+        let json = serde_json::to_string(&req).unwrap();
+        assert!(json.contains("\"removals\":[\"old_key\"]"));
+        assert!(json.contains("\"updates\""));
+    }
+
+    #[test]
+    fn test_rename_table_request_serialization() {
+        let source = Identifier::new("db1".to_string(), "table1".to_string());
+        let destination = Identifier::new("db2".to_string(), 
"table2".to_string());
+        let req = RenameTableRequest::new(source, destination);
+
+        let json = serde_json::to_string(&req).unwrap();
+        assert!(json.contains("\"source\""));
+        assert!(json.contains("\"destination\""));
+    }
+}
diff --git a/crates/paimon/src/api/api_response.rs 
b/crates/paimon/src/api/api_response.rs
index 83296c9..e282080 100644
--- a/crates/paimon/src/api/api_response.rs
+++ b/crates/paimon/src/api/api_response.rs
@@ -22,8 +22,7 @@
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 
-/// Base trait for REST responses.
-pub trait RESTResponse {}
+use crate::spec::Schema;
 
 /// Error response from REST API calls.
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -39,7 +38,6 @@ pub struct ErrorResponse {
     pub code: Option<i32>,
 }
 
-impl RESTResponse for ErrorResponse {}
 impl ErrorResponse {
     /// Create a new ErrorResponse.
     pub fn new(
@@ -57,6 +55,141 @@ impl ErrorResponse {
     }
 }
 
+/// Base response containing audit information.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct AuditRESTResponse {
+    /// The owner of the resource.
+    pub owner: Option<String>,
+    /// Timestamp when the resource was created.
+    pub created_at: Option<i64>,
+    /// User who created the resource.
+    pub created_by: Option<String>,
+    /// Timestamp when the resource was last updated.
+    pub updated_at: Option<i64>,
+    /// User who last updated the resource.
+    pub updated_by: Option<String>,
+}
+
+impl AuditRESTResponse {
+    /// Create a new AuditRESTResponse.
+    pub fn new(
+        owner: Option<String>,
+        created_at: Option<i64>,
+        created_by: Option<String>,
+        updated_at: Option<i64>,
+        updated_by: Option<String>,
+    ) -> Self {
+        Self {
+            owner,
+            created_at,
+            created_by,
+            updated_at,
+            updated_by,
+        }
+    }
+
+    /// Put audit options into the provided dictionary.
+    pub fn put_audit_options_to(&self, options: &mut HashMap<String, String>) {
+        if let Some(owner) = &self.owner {
+            options.insert("owner".to_string(), owner.clone());
+        }
+        if let Some(created_by) = &self.created_by {
+            options.insert("createdBy".to_string(), created_by.clone());
+        }
+        if let Some(created_at) = self.created_at {
+            options.insert("createdAt".to_string(), created_at.to_string());
+        }
+        if let Some(updated_by) = &self.updated_by {
+            options.insert("updatedBy".to_string(), updated_by.clone());
+        }
+        if let Some(updated_at) = self.updated_at {
+            options.insert("updatedAt".to_string(), updated_at.to_string());
+        }
+    }
+}
+
+/// Response for getting a table.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetTableResponse {
+    /// Audit information.
+    #[serde(flatten)]
+    pub audit: AuditRESTResponse,
+    /// The unique identifier of the table.
+    pub id: Option<String>,
+    /// The name of the table.
+    pub name: Option<String>,
+    /// The path to the table.
+    pub path: Option<String>,
+    /// Whether the table is external.
+    pub is_external: Option<bool>,
+    /// The schema ID of the table.
+    pub schema_id: Option<i64>,
+    /// The schema of the table.
+    pub schema: Option<Schema>,
+}
+
+impl GetTableResponse {
+    /// Create a new GetTableResponse.
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        id: Option<String>,
+        name: Option<String>,
+        path: Option<String>,
+        is_external: Option<bool>,
+        schema_id: Option<i64>,
+        schema: Option<Schema>,
+        audit: AuditRESTResponse,
+    ) -> Self {
+        Self {
+            audit,
+            id,
+            name,
+            path,
+            is_external,
+            schema_id,
+            schema,
+        }
+    }
+}
+
+/// Response for getting a database.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetDatabaseResponse {
+    /// Audit information.
+    #[serde(flatten)]
+    pub audit: AuditRESTResponse,
+    /// The unique identifier of the database.
+    pub id: Option<String>,
+    /// The name of the database.
+    pub name: Option<String>,
+    /// The location of the database.
+    pub location: Option<String>,
+    /// Configuration options for the database.
+    pub options: HashMap<String, String>,
+}
+
+impl GetDatabaseResponse {
+    /// Create a new GetDatabaseResponse.
+    pub fn new(
+        id: Option<String>,
+        name: Option<String>,
+        location: Option<String>,
+        options: HashMap<String, String>,
+        audit: AuditRESTResponse,
+    ) -> Self {
+        Self {
+            audit,
+            id,
+            name,
+            location,
+            options,
+        }
+    }
+}
+
 /// Response containing configuration defaults.
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
@@ -65,8 +198,6 @@ pub struct ConfigResponse {
     pub defaults: HashMap<String, String>,
 }
 
-impl RESTResponse for ConfigResponse {}
-
 impl ConfigResponse {
     /// Create a new ConfigResponse.
     pub fn new(defaults: HashMap<String, String>) -> Self {
@@ -97,8 +228,6 @@ pub struct ListDatabasesResponse {
     pub next_page_token: Option<String>,
 }
 
-impl RESTResponse for ListDatabasesResponse {}
-
 impl ListDatabasesResponse {
     /// Create a new ListDatabasesResponse.
     pub fn new(databases: Vec<String>, next_page_token: Option<String>) -> 
Self {
@@ -109,6 +238,26 @@ impl ListDatabasesResponse {
     }
 }
 
+/// Response for listing tables.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListTablesResponse {
+    /// List of table names.
+    pub tables: Option<Vec<String>>,
+    /// Token for the next page.
+    pub next_page_token: Option<String>,
+}
+
+impl ListTablesResponse {
+    /// Create a new ListTablesResponse.
+    pub fn new(tables: Option<Vec<String>>, next_page_token: Option<String>) 
-> Self {
+        Self {
+            tables,
+            next_page_token,
+        }
+    }
+}
+
 /// A paginated list of elements with an optional next page token.
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
@@ -128,7 +277,6 @@ impl<T> PagedList<T> {
         }
     }
 }
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -160,4 +308,24 @@ mod tests {
         assert!(json.contains("\"databases\":[\"db1\",\"db2\"]"));
         assert!(json.contains("\"nextPageToken\":\"token123\""));
     }
+
+    #[test]
+    fn test_audit_response_options() {
+        let audit = AuditRESTResponse::new(
+            Some("owner1".to_string()),
+            Some(1000),
+            Some("creator".to_string()),
+            Some(2000),
+            Some("updater".to_string()),
+        );
+
+        let mut options = HashMap::new();
+        audit.put_audit_options_to(&mut options);
+
+        assert_eq!(options.get("owner"), Some(&"owner1".to_string()));
+        assert_eq!(options.get("createdBy"), Some(&"creator".to_string()));
+        assert_eq!(options.get("createdAt"), Some(&"1000".to_string()));
+        assert_eq!(options.get("updatedBy"), Some(&"updater".to_string()));
+        assert_eq!(options.get("updatedAt"), Some(&"2000".to_string()));
+    }
 }
diff --git a/crates/paimon/src/api/auth/base.rs 
b/crates/paimon/src/api/auth/base.rs
index 77e9f76..7b0ae33 100644
--- a/crates/paimon/src/api/auth/base.rs
+++ b/crates/paimon/src/api/auth/base.rs
@@ -19,6 +19,10 @@
 
 use std::collections::HashMap;
 
+use async_trait::async_trait;
+
+use crate::Result;
+
 /// Parameter for REST authentication.
 ///
 /// Contains information about the request being authenticated.
@@ -70,7 +74,8 @@ impl RESTAuthParameter {
 ///
 /// Implement this trait to provide custom authentication mechanisms
 /// for REST API requests.
-pub trait AuthProvider {
+#[async_trait]
+pub trait AuthProvider: Send {
     /// Merge authentication headers into the base headers.
     ///
     /// # Arguments
@@ -78,13 +83,14 @@ pub trait AuthProvider {
     /// * `parameter` - Information about the request being authenticated
     ///
     /// # Returns
-    fn merge_auth_header(
-        &self,
+    async fn merge_auth_header(
+        &mut self,
         base_header: HashMap<String, String>,
         parameter: &RESTAuthParameter,
-    ) -> HashMap<String, String>;
+    ) -> Result<HashMap<String, String>>;
 }
-
+/// Authorization header key.
+pub const AUTHORIZATION_HEADER_KEY: &str = "Authorization";
 /// Function wrapper for REST authentication.
 ///
 /// This struct combines an initial set of headers with an authentication 
provider
@@ -114,8 +120,12 @@ impl RESTAuthFunction {
     ///
     /// # Returns
     /// A HashMap containing the authenticated headers.
-    pub fn apply(&self, parameter: &RESTAuthParameter) -> HashMap<String, 
String> {
+    pub async fn apply(
+        &mut self,
+        parameter: &RESTAuthParameter,
+    ) -> Result<HashMap<String, String>> {
         self.auth_provider
             .merge_auth_header(self.init_header.clone(), parameter)
+            .await
     }
 }
diff --git a/crates/paimon/src/api/auth/bear_provider.rs 
b/crates/paimon/src/api/auth/bearer_provider.rs
similarity index 77%
rename from crates/paimon/src/api/auth/bear_provider.rs
rename to crates/paimon/src/api/auth/bearer_provider.rs
index 96bdfe9..2b9f0e1 100644
--- a/crates/paimon/src/api/auth/bear_provider.rs
+++ b/crates/paimon/src/api/auth/bearer_provider.rs
@@ -19,6 +19,8 @@
 
 use std::collections::HashMap;
 
+use async_trait::async_trait;
+
 use super::base::{AuthProvider, RESTAuthParameter};
 
 /// Authentication provider using Bearer token.
@@ -41,17 +43,18 @@ impl BearerTokenAuthProvider {
     }
 }
 
+#[async_trait]
 impl AuthProvider for BearerTokenAuthProvider {
-    fn merge_auth_header(
-        &self,
+    async fn merge_auth_header(
+        &mut self,
         mut base_header: HashMap<String, String>,
         _parameter: &RESTAuthParameter,
-    ) -> HashMap<String, String> {
+    ) -> crate::Result<HashMap<String, String>> {
         base_header.insert(
             "Authorization".to_string(),
             format!("Bearer {}", self.token),
         );
-        base_header
+        Ok(base_header)
     }
 }
 
@@ -59,13 +62,16 @@ impl AuthProvider for BearerTokenAuthProvider {
 mod tests {
     use super::*;
 
-    #[test]
-    fn test_bearer_token_auth() {
-        let provider = BearerTokenAuthProvider::new("test-token");
+    #[tokio::test]
+    async fn test_bearer_token_auth() {
+        let mut provider = BearerTokenAuthProvider::new("test-token");
         let base_header = HashMap::new();
         let parameter = RESTAuthParameter::for_get("/test", HashMap::new());
 
-        let headers = provider.merge_auth_header(base_header, &parameter);
+        let headers = provider
+            .merge_auth_header(base_header, &parameter)
+            .await
+            .unwrap();
 
         assert_eq!(
             headers.get("Authorization"),
@@ -73,14 +79,17 @@ mod tests {
         );
     }
 
-    #[test]
-    fn test_bearer_token_with_base_headers() {
-        let provider = BearerTokenAuthProvider::new("my-token");
+    #[tokio::test]
+    async fn test_bearer_token_with_base_headers() {
+        let mut provider = BearerTokenAuthProvider::new("my-token");
         let mut base_header = HashMap::new();
         base_header.insert("Content-Type".to_string(), 
"application/json".to_string());
         let parameter = RESTAuthParameter::for_get("/test", HashMap::new());
 
-        let headers = provider.merge_auth_header(base_header, &parameter);
+        let headers = provider
+            .merge_auth_header(base_header, &parameter)
+            .await
+            .unwrap();
 
         assert_eq!(
             headers.get("Authorization"),
diff --git a/crates/paimon/src/api/auth/dlf_provider.rs 
b/crates/paimon/src/api/auth/dlf_provider.rs
new file mode 100644
index 0000000..e5a7ea0
--- /dev/null
+++ b/crates/paimon/src/api/auth/dlf_provider.rs
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DLF Authentication Provider for Alibaba Cloud Data Lake Formation.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use chrono::Utc;
+use reqwest::Client;
+use serde::{Deserialize, Serialize};
+
+use super::base::{AuthProvider, RESTAuthParameter, AUTHORIZATION_HEADER_KEY};
+use super::dlf_signer::{DLFRequestSigner, DLFSignerFactory};
+use crate::common::{CatalogOptions, Options};
+use crate::error::Error;
+use crate::Result;
+
+// ============================================================================
+// DLF Token and Token Loader
+// ============================================================================
+
+/// DLF Token containing access credentials for Alibaba Cloud Data Lake 
Formation.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DLFToken {
+    /// Access key ID for Alibaba Cloud.
+    #[serde(rename = "AccessKeyId")]
+    pub access_key_id: String,
+    /// Access key secret for Alibaba Cloud.
+    #[serde(rename = "AccessKeySecret")]
+    pub access_key_secret: String,
+    /// Security token for temporary credentials (optional).
+    #[serde(rename = "SecurityToken")]
+    pub security_token: Option<String>,
+    /// Expiration timestamp in milliseconds.
+    #[serde(rename = "ExpirationAt", default, skip_serializing)]
+    pub expiration_at_millis: Option<i64>,
+    /// Expiration time string (ISO 8601 format).
+    #[serde(
+        rename = "Expiration",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    pub expiration: Option<String>,
+}
+
+impl DLFToken {
+    /// Token date format for parsing expiration.
+    const TOKEN_DATE_FORMAT: &'static str = "%Y-%m-%dT%H:%M:%SZ";
+
+    /// Create a new DLFToken.
+    ///
+    /// # Arguments
+    /// * `access_key_id` - The access key ID
+    /// * `access_key_secret` - The access key secret
+    /// * `security_token` - Optional security token
+    /// * `expiration` - Optional expiration time string (ISO 8601 format)
+    /// * `expiration_at_millis` - Optional expiration timestamp in 
milliseconds.
+    ///   If provided, this value is used directly. Otherwise, it will be 
parsed from `expiration`.
+    pub fn new(
+        access_key_id: impl Into<String>,
+        access_key_secret: impl Into<String>,
+        security_token: Option<String>,
+        expiration_at_millis: Option<i64>,
+        expiration: Option<String>,
+    ) -> Self {
+        let access_key_id = access_key_id.into();
+        let access_key_secret = access_key_secret.into();
+
+        // Use provided expiration_at_millis, or parse from expiration string 
if not provided
+        let expiration_at_millis = expiration_at_millis.or_else(|| {
+            expiration
+                .as_deref()
+                .and_then(Self::parse_expiration_to_millis)
+        });
+
+        Self {
+            access_key_id,
+            access_key_secret,
+            security_token,
+            expiration_at_millis,
+            expiration,
+        }
+    }
+
+    /// Create a DLFToken from configuration options.
+    pub fn from_options(options: &Options) -> Option<Self> {
+        let access_key_id = 
options.get(CatalogOptions::DLF_ACCESS_KEY_ID)?.clone();
+        let access_key_secret = 
options.get(CatalogOptions::DLF_ACCESS_KEY_SECRET)?.clone();
+        let security_token = options
+            .get(CatalogOptions::DLF_ACCESS_SECURITY_TOKEN)
+            .cloned();
+
+        Some(Self::new(
+            access_key_id,
+            access_key_secret,
+            security_token,
+            None,
+            None,
+        ))
+    }
+
+    /// Parse expiration string to milliseconds timestamp.
+    pub fn parse_expiration_to_millis(expiration: &str) -> Option<i64> {
+        let datetime = chrono::NaiveDateTime::parse_from_str(expiration, 
Self::TOKEN_DATE_FORMAT)
+            .ok()?
+            .and_utc();
+        Some(datetime.timestamp_millis())
+    }
+}
+/// Trait for DLF token loaders.
+#[async_trait]
+pub trait DLFTokenLoader: Send + Sync {
+    /// Load a DLF token.
+    async fn load_token(&self) -> Result<DLFToken>;
+
+    /// Get a description of the loader.
+    fn description(&self) -> &str;
+}
+
+/// DLF ECS Token Loader.
+///
+/// Loads DLF tokens from ECS metadata service.
+///
+/// This implementation mirrors the Python DLFECSTokenLoader class,
+/// using class-level HTTP client for connection reuse and retry logic.
+pub struct DLFECSTokenLoader {
+    ecs_metadata_url: String,
+    role_name: Option<String>,
+    http_client: TokenHTTPClient,
+}
+
+impl DLFECSTokenLoader {
+    /// Create a new DLFECSTokenLoader.
+    ///
+    /// # Arguments
+    /// * `ecs_metadata_url` - ECS metadata service URL
+    /// * `role_name` - Optional role name. If None, will be fetched from 
metadata service
+    pub fn new(ecs_metadata_url: impl Into<String>, role_name: Option<String>) 
-> Self {
+        Self {
+            ecs_metadata_url: ecs_metadata_url.into(),
+            role_name,
+            http_client: TokenHTTPClient::new(),
+        }
+    }
+
+    /// Get the role name from ECS metadata service.
+    async fn get_role(&self) -> Result<String> {
+        self.http_client.get(&self.ecs_metadata_url).await
+    }
+
+    /// Get the token from ECS metadata service.
+    async fn get_token(&self, url: &str) -> Result<DLFToken> {
+        let token_json = self.http_client.get(url).await?;
+        serde_json::from_str(&token_json).map_err(|e| Error::DataInvalid {
+            message: format!("Failed to parse token JSON: {}", e),
+            source: None,
+        })
+    }
+
+    /// Build the token URL from base URL and role name.
+    fn build_token_url(&self, role_name: &str) -> String {
+        let base_url = self.ecs_metadata_url.trim_end_matches('/');
+        format!("{}/{}", base_url, role_name)
+    }
+}
+
+#[async_trait]
+impl DLFTokenLoader for DLFECSTokenLoader {
+    async fn load_token(&self) -> Result<DLFToken> {
+        let role_name = match &self.role_name {
+            Some(name) => name.clone(),
+            None => {
+                // Fetch role name from metadata service
+                self.get_role().await?
+            }
+        };
+
+        // Build token URL
+        let token_url = self.build_token_url(&role_name);
+
+        // Get token
+        self.get_token(&token_url).await
+    }
+
+    fn description(&self) -> &str {
+        &self.ecs_metadata_url
+    }
+}
+/// Factory for creating DLF token loaders.
+pub struct DLFTokenLoaderFactory;
+
+impl DLFTokenLoaderFactory {
+    /// Create a token loader based on options.
+    pub fn create_token_loader(options: &Options) -> Option<Arc<dyn 
DLFTokenLoader>> {
+        let loader = options.get(CatalogOptions::DLF_TOKEN_LOADER)?;
+
+        if loader == "ecs" {
+            let ecs_metadata_url = options
+                .get(CatalogOptions::DLF_TOKEN_ECS_METADATA_URL)
+                .cloned()
+                .unwrap_or_else(|| {
+                    
"http://100.100.100.200/latest/meta-data/Ram/security-credentials/".to_string()
+                });
+            let role_name = options
+                .get(CatalogOptions::DLF_TOKEN_ECS_ROLE_NAME)
+                .cloned();
+            Some(
+                Arc::new(DLFECSTokenLoader::new(ecs_metadata_url, role_name))
+                    as Arc<dyn DLFTokenLoader>,
+            )
+        } else {
+            None
+        }
+    }
+}
+// ============================================================================
+// DLF Auth Provider
+// ============================================================================
+
+/// Token expiration safe time in milliseconds (1 hour).
+/// Token will be refreshed if it expires within this time.
+const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
+
+/// DLF Authentication Provider for Alibaba Cloud Data Lake Formation.
+///
+/// This provider implements authentication for Alibaba Cloud DLF service,
+/// supporting both VPC endpoints (DLF4-HMAC-SHA256) and public endpoints
+/// (ROA v2 HMAC-SHA1).
+pub struct DLFAuthProvider {
+    uri: String,
+    token: Option<DLFToken>,
+    token_loader: Option<Arc<dyn DLFTokenLoader>>,
+    signer: Box<dyn DLFRequestSigner>,
+}
+
+impl DLFAuthProvider {
+    /// Create a new DLFAuthProvider.
+    ///
+    /// # Arguments
+    /// * `uri` - The DLF service URI
+    /// * `token` - Optional DLF token containing access credentials
+    /// * `token_loader` - Optional token loader for dynamic token retrieval
+    ///
+    /// # Errors
+    /// Returns an error if both `token` and `token_loader` are `None`.
+    pub fn new(
+        uri: impl Into<String>,
+        region: impl Into<String>,
+        signing_algorithm: impl Into<String>,
+        token: Option<DLFToken>,
+        token_loader: Option<Arc<dyn DLFTokenLoader>>,
+    ) -> Result<Self> {
+        if token.is_none() && token_loader.is_none() {
+            return Err(Error::ConfigInvalid {
+                message: "Either token or token_loader must be 
provided".to_string(),
+            });
+        }
+
+        let uri = uri.into();
+        let region = region.into();
+        let signing_algorithm = signing_algorithm.into();
+        let signer = DLFSignerFactory::create_signer(&signing_algorithm, 
&region);
+
+        Ok(Self {
+            uri,
+            token,
+            token_loader,
+            signer,
+        })
+    }
+
+    /// Get or refresh the token.
+    ///
+    /// If token_loader is configured, this method will:
+    /// - Load a new token if current token is None
+    /// - Refresh the token if it's about to expire (within 
TOKEN_EXPIRATION_SAFE_TIME_MILLIS)
+    async fn get_or_refresh_token(&mut self) -> Result<DLFToken> {
+        if let Some(loader) = &self.token_loader {
+            let need_reload = match &self.token {
+                None => true,
+                Some(token) => match token.expiration_at_millis {
+                    Some(expiration_at_millis) => {
+                        let now = chrono::Utc::now().timestamp_millis();
+                        expiration_at_millis - now < 
TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+                    }
+                    None => false,
+                },
+            };
+
+            if need_reload {
+                let new_token = loader.load_token().await?;
+                self.token = Some(new_token);
+            }
+        }
+
+        self.token.clone().ok_or_else(|| Error::DataInvalid {
+            message: "Either token or token_loader must be 
provided".to_string(),
+            source: None,
+        })
+    }
+
+    /// Extract host from URI.
+    fn extract_host(uri: &str) -> String {
+        let without_protocol = uri
+            .strip_prefix("https://";)
+            .or_else(|| uri.strip_prefix("http://";))
+            .unwrap_or(uri);
+
+        let path_index = 
without_protocol.find('/').unwrap_or(without_protocol.len());
+        without_protocol[..path_index].to_string()
+    }
+}
+
+#[async_trait]
+impl AuthProvider for DLFAuthProvider {
+    async fn merge_auth_header(
+        &mut self,
+        mut base_header: HashMap<String, String>,
+        rest_auth_parameter: &RESTAuthParameter,
+    ) -> crate::Result<HashMap<String, String>> {
+        // Get token (will auto-refresh if needed via token_loader)
+        let token = self.get_or_refresh_token().await?;
+
+        let now = Utc::now();
+        let host = Self::extract_host(&self.uri);
+
+        // Generate signature headers
+        let sign_headers = self.signer.sign_headers(
+            rest_auth_parameter.data.as_deref(),
+            &now,
+            token.security_token.as_deref(),
+            &host,
+        );
+
+        // Generate authorization header
+        let authorization =
+            self.signer
+                .authorization(rest_auth_parameter, &token, &host, 
&sign_headers);
+
+        // Merge all headers
+        base_header.extend(sign_headers);
+        base_header.insert(AUTHORIZATION_HEADER_KEY.to_string(), 
authorization);
+
+        Ok(base_header)
+    }
+}
+
+// ============================================================================
+// DLF Token Loader Implementation
+// ============================================================================
+
+/// HTTP client for token loading with retry and timeout configuration.
+struct TokenHTTPClient {
+    max_retries: u32,
+    client: Client,
+}
+
+impl TokenHTTPClient {
+    /// Create a new HTTP client with default settings.
+    fn new() -> Self {
+        let connect_timeout = std::time::Duration::from_secs(180); // 3 minutes
+        let read_timeout = std::time::Duration::from_secs(180); // 3 minutes
+
+        let client = Client::builder()
+            .timeout(read_timeout)
+            .connect_timeout(connect_timeout)
+            .build()
+            .expect("Failed to create HTTP client");
+
+        Self {
+            max_retries: 3,
+            client,
+        }
+    }
+
+    /// Perform HTTP GET request with retry logic.
+    async fn get(&self, url: &str) -> Result<String> {
+        let mut last_error = String::new();
+        for attempt in 0..self.max_retries {
+            match self.client.get(url).send().await {
+                Ok(response) if response.status().is_success() => {
+                    return response.text().await.map_err(|e| 
Error::DataInvalid {
+                        message: format!("Failed to read response: {}", e),
+                        source: None,
+                    });
+                }
+                Ok(response) => {
+                    last_error = format!("HTTP error: {}", response.status());
+                }
+                Err(e) => {
+                    last_error = format!("Request failed: {}", e);
+                }
+            }
+
+            if attempt < self.max_retries - 1 {
+                // Exponential backoff
+                let delay = std::time::Duration::from_millis(100 * 
2u64.pow(attempt));
+                tokio::time::sleep(delay).await;
+            }
+        }
+
+        Err(Error::DataInvalid {
+            message: last_error,
+            source: None,
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_extract_host() {
+        let uri = "http://dlf-abcdfgerrf.net/api/v1";;
+        let host = DLFAuthProvider::extract_host(uri);
+        assert_eq!(host, "dlf-abcdfgerrf.net");
+    }
+
+    #[test]
+    fn test_extract_host_no_path() {
+        let uri = "https://dlf.cn-abcdfgerrf.aliyuncs.com";;
+        let host = DLFAuthProvider::extract_host(uri);
+        assert_eq!(host, "dlf.cn-abcdfgerrf.aliyuncs.com");
+    }
+
+    #[test]
+    fn test_dlf_token_from_options() {
+        let mut options = Options::new();
+        options.set(CatalogOptions::DLF_ACCESS_KEY_ID, "test_key_id");
+        options.set(CatalogOptions::DLF_ACCESS_KEY_SECRET, "test_key_secret");
+        options.set(
+            CatalogOptions::DLF_ACCESS_SECURITY_TOKEN,
+            "test_security_token",
+        );
+
+        let token = DLFToken::from_options(&options).unwrap();
+        assert_eq!(token.access_key_id, "test_key_id");
+        assert_eq!(token.access_key_secret, "test_key_secret");
+        assert_eq!(
+            token.security_token,
+            Some("test_security_token".to_string())
+        );
+    }
+
+    #[test]
+    fn test_dlf_token_missing_credentials() {
+        let options = Options::new();
+        assert!(DLFToken::from_options(&options).is_none());
+    }
+
+    #[test]
+    fn test_parse_expiration() {
+        let expiration = "2024-12-31T23:59:59Z";
+        let millis = DLFToken::parse_expiration_to_millis(expiration);
+        assert!(millis.is_some());
+    }
+}
diff --git a/crates/paimon/src/api/auth/dlf_signer.rs 
b/crates/paimon/src/api/auth/dlf_signer.rs
new file mode 100644
index 0000000..133970b
--- /dev/null
+++ b/crates/paimon/src/api/auth/dlf_signer.rs
@@ -0,0 +1,650 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DLF Request Signer implementations for Alibaba Cloud Data Lake Formation.
+//!
+//! This module provides two signature algorithms for authenticating requests
+//! to Alibaba Cloud Data Lake Formation (DLF) service:
+//!
+//! # Signature Algorithms
+//!
+//! ## 1. DLF4-HMAC-SHA256 (Default Signer)
+//!
+//! Used for VPC endpoints (e.g., `*-vpc.dlf.aliyuncs.com`).
+//!
+//! **Algorithm Overview:**
+//! 1. Build a canonical request string from HTTP method, path, query params, 
and headers
+//! 2. Create a string-to-sign with algorithm, timestamp, credential scope, 
and hashed canonical request
+//! 3. Derive a signing key through multiple HMAC-SHA256 operations
+//! 4. Calculate the signature and construct the Authorization header
+//!
+//! **Signing Key Derivation:**
+//! ```text
+//! kSecret = "aliyun_v4" + AccessKeySecret
+//! kDate = HMAC-SHA256(kSecret, Date)
+//! kRegion = HMAC-SHA256(kDate, Region)
+//! kService = HMAC-SHA256(kRegion, "DlfNext")
+//! kSigning = HMAC-SHA256(kService, "aliyun_v4_request")
+//! ```
+//!
+//! **Authorization Header Format:**
+//! ```text
+//! DLF4-HMAC-SHA256 
Credential=AccessKeyId/Date/Region/DlfNext/aliyun_v4_request,Signature=hex_signature
+//! ```
+//!
+//! ## 2. HMAC-SHA1 (OpenAPI Signer)
+//!
+//! Used for public network endpoints (e.g., `dlfnext.*.aliyuncs.com`).
+//! Follows Alibaba Cloud ROA v2 signature style.
+//!
+//! **Algorithm Overview:**
+//! 1. Build canonicalized headers from x-acs-* headers
+//! 2. Build canonicalized resource from path and query params
+//! 3. Create string-to-sign with method, headers, and resource
+//! 4. Calculate signature using HMAC-SHA1
+//!
+//! **Authorization Header Format:**
+//! ```text
+//! acs AccessKeyId:base64_signature
+//! ```
+//!
+//! # References
+//!
+//! - [Alibaba Cloud API 
Signature](https://help.aliyun.com/document_detail/315526.html)
+//! - [DLF OpenAPI](https://help.aliyun.com/document_detail/197826.html)
+//!
+//! # Usage
+//!
+//! The signer is automatically selected based on the endpoint URI:
+//! - VPC endpoints → `DLFDefaultSigner` (DLF4-HMAC-SHA256)
+//! - Public endpoints with "dlfnext" or "openapi" → `DLFOpenApiSigner` 
(HMAC-SHA1)
+
+use std::collections::HashMap;
+
+use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
+use chrono::{DateTime, Utc};
+use hmac::{Hmac, Mac};
+use md5::Md5;
+use sha1::Sha1;
+use sha2::{Digest, Sha256};
+use uuid::Uuid;
+
+use super::base::RESTAuthParameter;
+use super::dlf_provider::DLFToken;
+
+type HmacSha256 = Hmac<Sha256>;
+type HmacSha1 = Hmac<Sha1>;
+
+/// Trait for DLF request signers.
+///
+/// Different signers implement different signature algorithms for
+/// authenticating requests to Alibaba Cloud DLF service.
+///
+/// # Implementations
+///
+/// - [`DLFDefaultSigner`]: Uses DLF4-HMAC-SHA256 for VPC endpoints
+/// - [`DLFOpenApiSigner`]: Uses HMAC-SHA1 for public endpoints
+pub trait DLFRequestSigner: Send + Sync {
+    /// Generate signature headers for the request.
+    fn sign_headers(
+        &self,
+        body: Option<&str>,
+        now: &DateTime<Utc>,
+        security_token: Option<&str>,
+        host: &str,
+    ) -> HashMap<String, String>;
+
+    /// Generate the Authorization header value.
+    fn authorization(
+        &self,
+        rest_auth_parameter: &RESTAuthParameter,
+        token: &DLFToken,
+        host: &str,
+        sign_headers: &HashMap<String, String>,
+    ) -> String;
+    #[allow(dead_code)]
+    /// Get the identifier for this signer.
+    fn identifier(&self) -> &str;
+}
+
+/// Default DLF signer using DLF4-HMAC-SHA256 algorithm.
+///
+/// This signer is used for VPC endpoints (e.g., 
`cn-hangzhou-vpc.dlf.aliyuncs.com`).
+///
+/// # Algorithm Details
+///
+/// The DLF4-HMAC-SHA256 algorithm is similar to AWS Signature Version 4:
+///
+/// 1. **Canonical Request**: Combine HTTP method, URI, query string, headers, 
and payload hash
+/// 2. **String-to-Sign**: Include algorithm, timestamp, credential scope, and 
canonical request hash
+/// 3. **Signing Key**: Derived through chained HMAC operations
+/// 4. **Signature**: HMAC-SHA256 of string-to-sign using the signing key
+///
+/// # Required Headers
+///
+/// The following headers are included in the signature calculation:
+/// - `content-md5`: MD5 hash of request body (if present)
+/// - `content-type`: Media type (if body present)
+/// - `x-dlf-content-sha256`: Always "UNSIGNED-PAYLOAD"
+/// - `x-dlf-date`: Request timestamp in format `%Y%m%dT%H%M%SZ`
+/// - `x-dlf-version`: API version ("v1")
+/// - `x-dlf-security-token`: Security token for temporary credentials 
(optional)
+///
+/// # Example
+///
+/// ```ignore
+/// use paimon::api::auth::{DLFDefaultSigner, DLFRequestSigner};
+///
+/// let signer = DLFDefaultSigner::new("cn-hangzhou");
+/// let headers = signer.sign_headers(Some(r#"{"key":"value"}"#), &Utc::now(), 
None, "dlf.aliyuncs.com");
+/// ```
+pub struct DLFDefaultSigner {
+    region: String,
+}
+
+impl DLFDefaultSigner {
+    pub const IDENTIFIER: &'static str = "default";
+    const VERSION: &'static str = "v1";
+    const SIGNATURE_ALGORITHM: &'static str = "DLF4-HMAC-SHA256";
+    const PRODUCT: &'static str = "DlfNext";
+    const REQUEST_TYPE: &'static str = "aliyun_v4_request";
+    const SIGNATURE_KEY: &'static str = "Signature";
+    const NEW_LINE: &'static str = "\n";
+
+    // Header keys
+    const DLF_CONTENT_MD5_HEADER_KEY: &'static str = "Content-MD5";
+    const DLF_CONTENT_TYPE_KEY: &'static str = "Content-Type";
+    const DLF_DATE_HEADER_KEY: &'static str = "x-dlf-date";
+    const DLF_SECURITY_TOKEN_HEADER_KEY: &'static str = "x-dlf-security-token";
+    const DLF_AUTH_VERSION_HEADER_KEY: &'static str = "x-dlf-version";
+    const DLF_CONTENT_SHA256_HEADER_KEY: &'static str = "x-dlf-content-sha256";
+    const DLF_CONTENT_SHA256_VALUE: &'static str = "UNSIGNED-PAYLOAD";
+
+    const AUTH_DATE_TIME_FORMAT: &'static str = "%Y%m%dT%H%M%SZ";
+    const MEDIA_TYPE: &'static str = "application/json";
+
+    const SIGNED_HEADERS: &'static [&'static str] = &[
+        "content-md5",
+        "content-type",
+        "x-dlf-content-sha256",
+        "x-dlf-date",
+        "x-dlf-version",
+        "x-dlf-security-token",
+    ];
+
+    /// Create a new DLFDefaultSigner with the given region.
+    pub fn new(region: impl Into<String>) -> Self {
+        Self {
+            region: region.into(),
+        }
+    }
+
+    fn md5_base64(raw: &str) -> String {
+        let mut hasher = Md5::new();
+        hasher.update(raw.as_bytes());
+        let hash = hasher.finalize();
+        BASE64_STANDARD.encode(hash)
+    }
+
+    fn hmac_sha256(key: &[u8], data: &str) -> Vec<u8> {
+        let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take 
key of any size");
+        mac.update(data.as_bytes());
+        mac.finalize().into_bytes().to_vec()
+    }
+
+    fn sha256_hex(raw: &str) -> String {
+        let mut hasher = Sha256::new();
+        hasher.update(raw.as_bytes());
+        hex::encode(hasher.finalize())
+    }
+
+    fn hex_encode(raw: &[u8]) -> String {
+        hex::encode(raw)
+    }
+
+    fn trim(value: &str) -> &str {
+        value.trim()
+    }
+
+    fn get_canonical_request(
+        &self,
+        rest_auth_parameter: &RESTAuthParameter,
+        headers: &HashMap<String, String>,
+    ) -> String {
+        let mut parts = vec![
+            rest_auth_parameter.method.clone(),
+            rest_auth_parameter.path.clone(),
+        ];
+
+        let canonical_query_string =
+            self.build_canonical_query_string(&rest_auth_parameter.parameters);
+        parts.push(canonical_query_string);
+
+        let sorted_headers = self.build_sorted_signed_headers_map(headers);
+        for (key, value) in sorted_headers {
+            parts.push(format!("{}:{}", key, value));
+        }
+
+        let content_sha256 = headers
+            .get(Self::DLF_CONTENT_SHA256_HEADER_KEY)
+            .map(|s| s.as_str())
+            .unwrap_or(Self::DLF_CONTENT_SHA256_VALUE);
+        parts.push(content_sha256.to_string());
+
+        parts.join(Self::NEW_LINE)
+    }
+
+    fn build_canonical_query_string(&self, parameters: &HashMap<String, 
String>) -> String {
+        if parameters.is_empty() {
+            return String::new();
+        }
+
+        let mut sorted_params: Vec<_> = parameters.iter().collect();
+        sorted_params.sort_by(|a, b| a.0.cmp(b.0));
+
+        let query_parts: Vec<String> = sorted_params
+            .iter()
+            .map(|(key, value)| {
+                let key = Self::trim(key);
+                if !value.is_empty() {
+                    let value = Self::trim(value);
+                    format!("{}={}", key, value)
+                } else {
+                    key.to_string()
+                }
+            })
+            .collect();
+
+        query_parts.join("&")
+    }
+
+    fn build_sorted_signed_headers_map(
+        &self,
+        headers: &HashMap<String, String>,
+    ) -> Vec<(String, String)> {
+        let mut sorted_headers: Vec<(String, String)> = headers
+            .iter()
+            .filter(|(key, _)| {
+                let lower_key = key.to_lowercase();
+                Self::SIGNED_HEADERS.contains(&lower_key.as_str())
+            })
+            .map(|(key, value)| (key.to_lowercase(), 
Self::trim(value).to_string()))
+            .collect();
+
+        sorted_headers.sort_by(|a, b| a.0.cmp(&b.0));
+        sorted_headers
+    }
+}
+
+impl DLFRequestSigner for DLFDefaultSigner {
+    fn sign_headers(
+        &self,
+        body: Option<&str>,
+        now: &DateTime<Utc>,
+        security_token: Option<&str>,
+        _host: &str,
+    ) -> HashMap<String, String> {
+        let mut sign_headers = HashMap::new();
+
+        let date_time = now.format(Self::AUTH_DATE_TIME_FORMAT).to_string();
+        sign_headers.insert(Self::DLF_DATE_HEADER_KEY.to_string(), date_time);
+        sign_headers.insert(
+            Self::DLF_CONTENT_SHA256_HEADER_KEY.to_string(),
+            Self::DLF_CONTENT_SHA256_VALUE.to_string(),
+        );
+        sign_headers.insert(
+            Self::DLF_AUTH_VERSION_HEADER_KEY.to_string(),
+            Self::VERSION.to_string(),
+        );
+
+        if let Some(body_content) = body {
+            if !body_content.is_empty() {
+                sign_headers.insert(
+                    Self::DLF_CONTENT_TYPE_KEY.to_string(),
+                    Self::MEDIA_TYPE.to_string(),
+                );
+                sign_headers.insert(
+                    Self::DLF_CONTENT_MD5_HEADER_KEY.to_string(),
+                    Self::md5_base64(body_content),
+                );
+            }
+        }
+
+        if let Some(token) = security_token {
+            sign_headers.insert(
+                Self::DLF_SECURITY_TOKEN_HEADER_KEY.to_string(),
+                token.to_string(),
+            );
+        }
+
+        sign_headers
+    }
+
+    fn authorization(
+        &self,
+        rest_auth_parameter: &RESTAuthParameter,
+        token: &DLFToken,
+        _host: &str,
+        sign_headers: &HashMap<String, String>,
+    ) -> String {
+        let date_time = sign_headers.get(Self::DLF_DATE_HEADER_KEY).unwrap();
+        let date = &date_time[..8];
+
+        let canonical_request = 
self.get_canonical_request(rest_auth_parameter, sign_headers);
+
+        let string_to_sign = [
+            Self::SIGNATURE_ALGORITHM.to_string(),
+            date_time.clone(),
+            format!(
+                "{}/{}/{}/{}",
+                date,
+                self.region,
+                Self::PRODUCT,
+                Self::REQUEST_TYPE
+            ),
+            Self::sha256_hex(&canonical_request),
+        ]
+        .join(Self::NEW_LINE);
+
+        // Derive signing key
+        let date_key = Self::hmac_sha256(
+            format!("aliyun_v4{}", token.access_key_secret).as_bytes(),
+            date,
+        );
+        let date_region_key = Self::hmac_sha256(&date_key, &self.region);
+        let date_region_service_key = Self::hmac_sha256(&date_region_key, 
Self::PRODUCT);
+        let signing_key = Self::hmac_sha256(&date_region_service_key, 
Self::REQUEST_TYPE);
+
+        let signature_bytes = Self::hmac_sha256(&signing_key, &string_to_sign);
+        let signature = Self::hex_encode(&signature_bytes);
+
+        format!(
+            "{} Credential={}/{}/{}/{}/{},{}={}",
+            Self::SIGNATURE_ALGORITHM,
+            token.access_key_id,
+            date,
+            self.region,
+            Self::PRODUCT,
+            Self::REQUEST_TYPE,
+            Self::SIGNATURE_KEY,
+            signature
+        )
+    }
+
+    fn identifier(&self) -> &str {
+        Self::IDENTIFIER
+    }
+}
+
+/// DLF OpenAPI signer using HMAC-SHA1 algorithm.
+///
+/// This signer follows the Alibaba Cloud ROA v2 signature style and is used
+/// for public network endpoints (e.g., `dlfnext.cn-asdnbhwf.aliyuncs.com`).
+///
+/// # Algorithm Details
+///
+/// The HMAC-SHA1 algorithm is the traditional Alibaba Cloud API signature 
method:
+///
+/// 1. **Canonicalized Headers**: Sort and format all `x-acs-*` headers
+/// 2. **Canonicalized Resource**: URL-decoded path with sorted query 
parameters
+/// 3. **String-to-Sign**: Combine HTTP method, standard headers, 
canonicalized headers, and resource
+/// 4. **Signature**: Base64-encoded HMAC-SHA1 of string-to-sign
+///
+/// # Required Headers
+///
+/// The following headers are included in requests:
+/// - `Date`: Request timestamp in RFC 1123 format
+/// - `Accept`: Always "application/json"
+/// - `Content-MD5`: MD5 hash of request body (if present)
+/// - `Content-Type`: Always "application/json" (if body present)
+/// - `Host`: Endpoint host
+/// - `x-acs-signature-method`: Always "HMAC-SHA1"
+/// - `x-acs-signature-nonce`: Unique UUID for each request
+/// - `x-acs-signature-version`: Always "1.0"
+/// - `x-acs-version`: API version ("2026-01-18")
+/// - `x-acs-security-token`: Security token for temporary credentials 
(optional)
+///
+/// # Example
+///
+/// ```ignore
+/// use paimon::api::auth::{DLFOpenApiSigner, DLFRequestSigner};
+///
+/// let signer = DLFOpenApiSigner;
+/// let headers = signer.sign_headers(Some(r#"{"key":"value"}"#), &Utc::now(), 
None, "dlfnext.aliyuncs.com");
+/// ```
+pub struct DLFOpenApiSigner;
+
+impl DLFOpenApiSigner {
+    pub const IDENTIFIER: &'static str = "openapi";
+
+    // Header constants
+    const DATE_HEADER: &'static str = "Date";
+    const ACCEPT_HEADER: &'static str = "Accept";
+    const CONTENT_MD5_HEADER: &'static str = "Content-MD5";
+    const CONTENT_TYPE_HEADER: &'static str = "Content-Type";
+    const HOST_HEADER: &'static str = "Host";
+    const X_ACS_SIGNATURE_METHOD: &'static str = "x-acs-signature-method";
+    const X_ACS_SIGNATURE_NONCE: &'static str = "x-acs-signature-nonce";
+    const X_ACS_SIGNATURE_VERSION: &'static str = "x-acs-signature-version";
+    const X_ACS_VERSION: &'static str = "x-acs-version";
+    const X_ACS_SECURITY_TOKEN: &'static str = "x-acs-security-token";
+
+    // Values
+    const DATE_FORMAT: &'static str = "%a, %d %b %Y %H:%M:%S GMT";
+    const ACCEPT_VALUE: &'static str = "application/json";
+    const CONTENT_TYPE_VALUE: &'static str = "application/json";
+    const SIGNATURE_METHOD_VALUE: &'static str = "HMAC-SHA1";
+    const SIGNATURE_VERSION_VALUE: &'static str = "1.0";
+    const API_VERSION: &'static str = "2026-01-18";
+
+    fn md5_base64(data: &str) -> String {
+        let mut hasher = Md5::new();
+        hasher.update(data.as_bytes());
+        let hash = hasher.finalize();
+        BASE64_STANDARD.encode(hash)
+    }
+
+    fn hmac_sha1_base64(key: &str, data: &str) -> String {
+        let mut mac =
+            HmacSha1::new_from_slice(key.as_bytes()).expect("HMAC can take key 
of any size");
+        mac.update(data.as_bytes());
+        BASE64_STANDARD.encode(mac.finalize().into_bytes())
+    }
+
+    fn trim(value: &str) -> &str {
+        value.trim()
+    }
+
+    fn build_canonicalized_headers(&self, headers: &HashMap<String, String>) 
-> String {
+        let mut sorted_headers: Vec<(String, String)> = headers
+            .iter()
+            .filter(|(key, _)| key.to_lowercase().starts_with("x-acs-"))
+            .map(|(key, value)| (key.to_lowercase(), 
Self::trim(value).to_string()))
+            .collect();
+
+        sorted_headers.sort_by(|a, b| a.0.cmp(&b.0));
+
+        let mut result = String::new();
+        for (key, value) in sorted_headers {
+            result.push_str(&format!("{}:{}\n", key, value));
+        }
+        result
+    }
+
+    fn build_canonicalized_resource(&self, rest_auth_parameter: 
&RESTAuthParameter) -> String {
+        let path = 
urlencoding::decode(&rest_auth_parameter.path).unwrap_or_default();
+
+        if rest_auth_parameter.parameters.is_empty() {
+            return path.to_string();
+        }
+
+        let mut sorted_params: Vec<_> = 
rest_auth_parameter.parameters.iter().collect();
+        sorted_params.sort_by(|a, b| a.0.cmp(b.0));
+
+        let query_parts: Vec<String> = sorted_params
+            .iter()
+            .map(|(key, value)| {
+                let decoded_value = 
urlencoding::decode(value).unwrap_or_default();
+                if !decoded_value.is_empty() {
+                    format!("{}={}", key, decoded_value)
+                } else {
+                    key.to_string()
+                }
+            })
+            .collect();
+
+        format!("{}?{}", path, query_parts.join("&"))
+    }
+
+    fn build_string_to_sign(
+        &self,
+        rest_auth_parameter: &RESTAuthParameter,
+        headers: &HashMap<String, String>,
+        canonicalized_headers: &str,
+        canonicalized_resource: &str,
+    ) -> String {
+        let parts = [
+            rest_auth_parameter.method.clone(),
+            headers
+                .get(Self::ACCEPT_HEADER)
+                .cloned()
+                .unwrap_or_default(),
+            headers
+                .get(Self::CONTENT_MD5_HEADER)
+                .cloned()
+                .unwrap_or_default(),
+            headers
+                .get(Self::CONTENT_TYPE_HEADER)
+                .cloned()
+                .unwrap_or_default(),
+            headers.get(Self::DATE_HEADER).cloned().unwrap_or_default(),
+            canonicalized_headers.to_string(),
+        ];
+
+        parts.join("\n") + canonicalized_resource
+    }
+}
+
+impl DLFRequestSigner for DLFOpenApiSigner {
+    fn sign_headers(
+        &self,
+        body: Option<&str>,
+        now: &DateTime<Utc>,
+        security_token: Option<&str>,
+        host: &str,
+    ) -> HashMap<String, String> {
+        let mut headers = HashMap::new();
+
+        // Date header in RFC 1123 format
+        headers.insert(
+            Self::DATE_HEADER.to_string(),
+            now.format(Self::DATE_FORMAT).to_string(),
+        );
+
+        // Accept header
+        headers.insert(
+            Self::ACCEPT_HEADER.to_string(),
+            Self::ACCEPT_VALUE.to_string(),
+        );
+
+        // Content-MD5 and Content-Type (if body exists)
+        if let Some(body_content) = body {
+            if !body_content.is_empty() {
+                headers.insert(
+                    Self::CONTENT_MD5_HEADER.to_string(),
+                    Self::md5_base64(body_content),
+                );
+                headers.insert(
+                    Self::CONTENT_TYPE_HEADER.to_string(),
+                    Self::CONTENT_TYPE_VALUE.to_string(),
+                );
+            }
+        }
+
+        // Host header
+        headers.insert(Self::HOST_HEADER.to_string(), host.to_string());
+
+        // x-acs-* headers
+        headers.insert(
+            Self::X_ACS_SIGNATURE_METHOD.to_string(),
+            Self::SIGNATURE_METHOD_VALUE.to_string(),
+        );
+        headers.insert(
+            Self::X_ACS_SIGNATURE_NONCE.to_string(),
+            Uuid::new_v4().to_string(),
+        );
+        headers.insert(
+            Self::X_ACS_SIGNATURE_VERSION.to_string(),
+            Self::SIGNATURE_VERSION_VALUE.to_string(),
+        );
+        headers.insert(
+            Self::X_ACS_VERSION.to_string(),
+            Self::API_VERSION.to_string(),
+        );
+
+        // Security token (if present)
+        if let Some(token) = security_token {
+            headers.insert(Self::X_ACS_SECURITY_TOKEN.to_string(), 
token.to_string());
+        }
+
+        headers
+    }
+
+    fn authorization(
+        &self,
+        rest_auth_parameter: &RESTAuthParameter,
+        token: &DLFToken,
+        _host: &str,
+        sign_headers: &HashMap<String, String>,
+    ) -> String {
+        let canonicalized_headers = 
self.build_canonicalized_headers(sign_headers);
+        let canonicalized_resource = 
self.build_canonicalized_resource(rest_auth_parameter);
+        let string_to_sign = self.build_string_to_sign(
+            rest_auth_parameter,
+            sign_headers,
+            &canonicalized_headers,
+            &canonicalized_resource,
+        );
+
+        let signature = Self::hmac_sha1_base64(&token.access_key_secret, 
&string_to_sign);
+        format!("acs {}:{}", token.access_key_id, signature)
+    }
+
+    fn identifier(&self) -> &str {
+        Self::IDENTIFIER
+    }
+}
+
+/// use paimon::api::auth::DLFSignerFactory;
+///
+/// // Auto-detect from URI
+/// let signer = DLFSignerFactory::create_signer("default", "cn-hangzhou");
+/// let algo = 
DLFSignerFactory::parse_signing_algo_from_uri(Some("http://dlfnext.ajinnbjug.aliyuncs.com";));
+/// assert_eq!(algo, "openapi");
+/// ```
+pub struct DLFSignerFactory;
+
+impl DLFSignerFactory {
+    /// Create a signer based on the signing algorithm.
+    pub fn create_signer(signing_algorithm: &str, region: &str) -> Box<dyn 
DLFRequestSigner> {
+        if signing_algorithm == DLFOpenApiSigner::IDENTIFIER {
+            Box::new(DLFOpenApiSigner)
+        } else {
+            Box::new(DLFDefaultSigner::new(region))
+        }
+    }
+}
diff --git a/crates/paimon/src/api/auth/factory.rs 
b/crates/paimon/src/api/auth/factory.rs
index 58234b2..68c2881 100644
--- a/crates/paimon/src/api/auth/factory.rs
+++ b/crates/paimon/src/api/auth/factory.rs
@@ -17,10 +17,98 @@
 
 //! Authentication provider factory.
 
+use crate::api::auth::dlf_provider::DLFTokenLoaderFactory;
+use crate::api::auth::{BearerTokenAuthProvider, DLFAuthProvider, DLFToken};
+use crate::api::AuthProvider;
 use crate::common::{CatalogOptions, Options};
 use crate::Error;
+use regex::Regex;
 
-use super::{AuthProvider, BearerTokenAuthProvider};
+/// Factory for creating DLF authentication providers.
+pub struct DLFAuthProviderFactory;
+
+impl DLFAuthProviderFactory {
+    /// OpenAPI identifier.
+    pub const OPENAPI_IDENTIFIER: &'static str = "openapi";
+    /// Default identifier.
+    pub const DEFAULT_IDENTIFIER: &'static str = "default";
+    /// Region pattern for parsing from URI.
+    const REGION_PATTERN: &'static str = r"(?:pre-)?([a-z]+-[a-z]+(?:-\d+)?)";
+
+    /// Parse region from DLF endpoint URI.
+    pub fn parse_region_from_uri(uri: Option<&str>) -> Option<String> {
+        let uri = uri?;
+        let re = Regex::new(Self::REGION_PATTERN).ok()?;
+        let caps = re.captures(uri)?;
+        caps.get(1).map(|m| m.as_str().to_string())
+    }
+
+    /// Parse signing algorithm from URI.
+    ///
+    /// Returns "openapi" for public endpoints (dlfnext in host),
+    /// otherwise returns "default".
+    pub fn parse_signing_algo_from_uri(uri: Option<&str>) -> &'static str {
+        if let Some(uri) = uri {
+            let host = uri.to_lowercase();
+            let host = host
+                .strip_prefix("http://";)
+                .unwrap_or(host.strip_prefix("https://";).unwrap_or(&host));
+            let host = host.split('/').next().unwrap_or("");
+            let host = host.split(':').next().unwrap_or("");
+
+            if host.starts_with("dlfnext") {
+                return Self::OPENAPI_IDENTIFIER;
+            }
+        }
+        Self::DEFAULT_IDENTIFIER
+    }
+
+    /// Create a DLF authentication provider from options.
+    ///
+    /// # Arguments
+    /// * `options` - The configuration options.
+    ///
+    /// # Returns
+    /// A boxed AuthProvider trait object.
+    ///
+    /// # Errors
+    /// Returns an error if required configuration is missing.
+    pub fn create_provider(options: &Options) -> Result<Box<dyn AuthProvider>, 
Error> {
+        let uri = options
+            .get(CatalogOptions::URI)
+            .ok_or_else(|| Error::ConfigInvalid {
+                message: "URI is required for DLF authentication".to_string(),
+            })?
+            .clone();
+
+        // Get region from options or parse from URI
+        let region = options
+            .get(CatalogOptions::DLF_REGION)
+            .cloned()
+            .or_else(|| Self::parse_region_from_uri(Some(&uri)))
+            .ok_or_else(|| Error::ConfigInvalid {
+                message: "Could not get region from config or URI. Please set 
'dlf.region' or use a standard DLF endpoint URI.".to_string(),
+            })?;
+
+        // Get signing algorithm from options, or auto-detect from URI
+        let signing_algorithm = options
+            .get(CatalogOptions::DLF_SIGNING_ALGORITHM)
+            .map(|s| s.as_str())
+            .filter(|s| *s != "default")
+            .unwrap_or_else(|| Self::parse_signing_algo_from_uri(Some(&uri)))
+            .to_string();
+
+        let dlf_provider = DLFAuthProvider::new(
+            uri,
+            region,
+            signing_algorithm,
+            DLFToken::from_options(options),
+            DLFTokenLoaderFactory::create_token_loader(options),
+        )?;
+
+        Ok(Box::new(dlf_provider))
+    }
+}
 
 /// Factory for creating authentication providers.
 pub struct AuthProviderFactory;
@@ -49,33 +137,39 @@ impl AuthProviderFactory {
                         })?;
                 Ok(Box::new(BearerTokenAuthProvider::new(token)))
             }
-            None => Err(Error::ConfigInvalid {
-                message: "auth provider is required".to_string(),
-            }),
+            Some("dlf") => DLFAuthProviderFactory::create_provider(options),
             Some(unknown) => Err(Error::ConfigInvalid {
                 message: format!("Unknown auth provider: {unknown}"),
             }),
+            None => Err(Error::ConfigInvalid {
+                message: "auth provider is required".to_string(),
+            }),
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
+    use crate::api::auth::base::AUTHORIZATION_HEADER_KEY;
+
     use super::super::RESTAuthParameter;
     use super::*;
     use std::collections::HashMap;
 
-    #[test]
-    fn test_create_bearer_provider() {
+    #[tokio::test]
+    async fn test_create_bearer_provider() {
         let mut options = Options::new();
         options.set(CatalogOptions::TOKEN_PROVIDER, "bear");
         options.set(CatalogOptions::TOKEN, "test-token");
 
-        let provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
+        let mut provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
 
         let base_header = HashMap::new();
         let param = RESTAuthParameter::new("GET", "/test", None, 
HashMap::new());
-        let result = provider.merge_auth_header(base_header, &param);
+        let result = provider
+            .merge_auth_header(base_header, &param)
+            .await
+            .unwrap();
 
         assert_eq!(
             result.get("Authorization"),
@@ -98,4 +192,58 @@ mod tests {
         let result = AuthProviderFactory::create_auth_provider(&options);
         assert!(result.is_err());
     }
+
+    #[tokio::test]
+    async fn test_create_dlf_provider() {
+        let mut options = Options::new();
+        options.set(CatalogOptions::TOKEN_PROVIDER, "dlf");
+        options.set(CatalogOptions::URI, "http://dlf-asdaswfnb.net/";);
+        options.set(CatalogOptions::DLF_REGION, "cn-hangzhou");
+        options.set(CatalogOptions::DLF_ACCESS_KEY_ID, "test_key_id");
+        options.set(CatalogOptions::DLF_ACCESS_KEY_SECRET, "test_key_secret");
+
+        let mut provider = 
AuthProviderFactory::create_auth_provider(&options).unwrap();
+
+        let base_header = HashMap::new();
+        let param = RESTAuthParameter::new("GET", "/test", None, 
HashMap::new());
+        let result = provider
+            .merge_auth_header(base_header, &param)
+            .await
+            .unwrap();
+
+        assert!(result.contains_key(AUTHORIZATION_HEADER_KEY));
+    }
+
+    #[test]
+    fn test_dlf_provider_missing_region() {
+        let mut options = Options::new();
+        options.set(CatalogOptions::TOKEN_PROVIDER, "dlf");
+        options.set(CatalogOptions::URI, "http://example.com/";);
+        options.set(CatalogOptions::DLF_ACCESS_KEY_ID, "test_key_id");
+        options.set(CatalogOptions::DLF_ACCESS_KEY_SECRET, "test_key_secret");
+
+        let result = AuthProviderFactory::create_auth_provider(&options);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_parse_region_from_uri() {
+        let region = DLFAuthProviderFactory::parse_region_from_uri(Some(
+            "http://cn-hangzhou-vpc.dlf.aliyuncs.com";,
+        ));
+        assert_eq!(region, Some("cn-hangzhou".to_string()));
+    }
+
+    #[test]
+    fn test_parse_signing_algo_from_uri() {
+        let algo = DLFAuthProviderFactory::parse_signing_algo_from_uri(Some(
+            "http://dlfnext.cn-hangzhou.aliyuncs.com";,
+        ));
+        assert_eq!(algo, "openapi");
+
+        let algo = DLFAuthProviderFactory::parse_signing_algo_from_uri(Some(
+            "http://cn-hangzhou-vpc.dlf.aliyuncs.com";,
+        ));
+        assert_eq!(algo, "default");
+    }
 }
diff --git a/crates/paimon/src/api/auth/mod.rs 
b/crates/paimon/src/api/auth/mod.rs
index 219d343..7afb9d8 100644
--- a/crates/paimon/src/api/auth/mod.rs
+++ b/crates/paimon/src/api/auth/mod.rs
@@ -18,9 +18,12 @@
 //! Authentication module for REST API.
 
 mod base;
-mod bear_provider;
+mod bearer_provider;
+mod dlf_provider;
+mod dlf_signer;
 mod factory;
 
 pub use base::{AuthProvider, RESTAuthFunction, RESTAuthParameter};
-pub use bear_provider::BearerTokenAuthProvider;
-pub use factory::AuthProviderFactory;
+pub use bearer_provider::BearerTokenAuthProvider;
+pub use dlf_provider::{DLFAuthProvider, DLFECSTokenLoader, DLFToken, 
DLFTokenLoader};
+pub use factory::{AuthProviderFactory, DLFAuthProviderFactory};
diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs
index e1c205a..958323e 100644
--- a/crates/paimon/src/api/mod.rs
+++ b/crates/paimon/src/api/mod.rs
@@ -19,6 +19,7 @@
 //!
 //! This module provides REST API client, request, and response types.
 
+pub mod api_request;
 pub mod auth;
 pub mod resource_paths;
 pub mod rest_api;
@@ -28,9 +29,15 @@ pub mod rest_util;
 
 mod api_response;
 
+// Re-export request types
+pub use api_request::{
+    AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, 
RenameTableRequest,
+};
+
 // Re-export response types
 pub use api_response::{
-    ConfigResponse, ErrorResponse, ListDatabasesResponse, PagedList, 
RESTResponse,
+    AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, 
GetTableResponse,
+    ListDatabasesResponse, ListTablesResponse, PagedList,
 };
 
 // Re-export error types
diff --git a/crates/paimon/src/api/resource_paths.rs 
b/crates/paimon/src/api/resource_paths.rs
index 43577b8..1cbc88f 100644
--- a/crates/paimon/src/api/resource_paths.rs
+++ b/crates/paimon/src/api/resource_paths.rs
@@ -19,6 +19,8 @@
 
 use crate::common::{CatalogOptions, Options};
 
+use super::rest_util::RESTUtil;
+
 /// Resource paths for REST API endpoints.
 #[derive(Clone)]
 pub struct ResourcePaths {
@@ -28,6 +30,8 @@ pub struct ResourcePaths {
 impl ResourcePaths {
     const V1: &'static str = "v1";
     const DATABASES: &'static str = "databases";
+    const TABLES: &'static str = "tables";
+    const TABLE_DETAILS: &'static str = "table-details";
 
     /// Create a new ResourcePaths with the given prefix.
     pub fn new(prefix: &str) -> Self {
@@ -62,6 +66,71 @@ impl ResourcePaths {
     pub fn databases(&self) -> String {
         format!("{}/{}", self.base_path, Self::DATABASES)
     }
+
+    /// Get a specific database endpoint path.
+    pub fn database(&self, name: &str) -> String {
+        format!(
+            "{}/{}/{}",
+            self.base_path,
+            Self::DATABASES,
+            RESTUtil::encode_string(name)
+        )
+    }
+
+    /// Get the tables endpoint path.
+    pub fn tables(&self, database_name: Option<&str>) -> String {
+        if let Some(db_name) = database_name {
+            format!(
+                "{}/{}/{}/{}",
+                self.base_path,
+                Self::DATABASES,
+                RESTUtil::encode_string(db_name),
+                Self::TABLES
+            )
+        } else {
+            format!("{}/{}", self.base_path, Self::TABLES)
+        }
+    }
+
+    /// Get a specific table endpoint path.
+    pub fn table(&self, database_name: &str, table_name: &str) -> String {
+        format!(
+            "{}/{}/{}/{}/{}",
+            self.base_path,
+            Self::DATABASES,
+            RESTUtil::encode_string(database_name),
+            Self::TABLES,
+            RESTUtil::encode_string(table_name)
+        )
+    }
+
+    /// Get the table details endpoint path.
+    pub fn table_details(&self, database_name: &str) -> String {
+        format!(
+            "{}/{}/{}/{}",
+            self.base_path,
+            Self::DATABASES,
+            RESTUtil::encode_string(database_name),
+            Self::TABLE_DETAILS
+        )
+    }
+
+    /// Get the table token endpoint path.
+    pub fn table_token(&self, database_name: &str, table_name: &str) -> String 
{
+        format!(
+            "{}/{}/{}/{}/{}/token",
+            self.base_path,
+            Self::DATABASES,
+            RESTUtil::encode_string(database_name),
+            Self::TABLES,
+            RESTUtil::encode_string(table_name)
+        )
+    }
+
+    /// Get the rename table endpoint path.
+    pub fn rename_table(&self) -> String {
+        format!("{}/{}/rename", self.base_path, Self::TABLES)
+    }
 }
 
 #[cfg(test)]
@@ -72,12 +141,25 @@ mod tests {
     fn test_resource_paths_basic() {
         let paths = ResourcePaths::new("");
         assert_eq!(paths.databases(), "/v1/databases");
+        assert_eq!(paths.tables(None), "/v1/tables");
     }
 
     #[test]
     fn test_resource_paths_with_prefix() {
         let paths = ResourcePaths::new("my-catalog");
         assert_eq!(paths.databases(), "/v1/my-catalog/databases");
+        assert_eq!(
+            paths.database("test-db"),
+            "/v1/my-catalog/databases/test-db"
+        );
+    }
+
+    #[test]
+    fn test_resource_paths_table() {
+        let paths = ResourcePaths::new("");
+        let table_path = paths.table("my-db", "my-table");
+        assert!(table_path.contains("my-db"));
+        assert!(table_path.contains("my-table"));
     }
 
     #[test]
diff --git a/crates/paimon/src/api/rest_api.rs 
b/crates/paimon/src/api/rest_api.rs
index d975d99..f785111 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -23,17 +23,56 @@
 use std::collections::HashMap;
 
 use crate::api::rest_client::HttpClient;
+use crate::catalog::Identifier;
 use crate::common::{CatalogOptions, Options};
+use crate::spec::Schema;
 use crate::Result;
 
-use super::api_response::{ConfigResponse, ListDatabasesResponse, PagedList};
+use super::api_request::{
+    AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, 
RenameTableRequest,
+};
+use super::api_response::{
+    ConfigResponse, GetDatabaseResponse, GetTableResponse, 
ListDatabasesResponse,
+    ListTablesResponse, PagedList,
+};
 use super::auth::{AuthProviderFactory, RESTAuthFunction};
 use super::resource_paths::ResourcePaths;
 use super::rest_util::RESTUtil;
 
+/// Validate that a string is not empty after trimming.
+///
+/// # Arguments
+/// * `value` - The string to validate.
+/// * `field_name` - The name of the field for error messages.
+///
+/// # Returns
+/// `Ok(())` if valid, `Err` if empty.
+fn validate_non_empty(value: &str, field_name: &str) -> Result<()> {
+    if value.trim().is_empty() {
+        return Err(crate::Error::ConfigInvalid {
+            message: format!("{} cannot be empty", field_name),
+        });
+    }
+    Ok(())
+}
+
+/// Validate that multiple strings are not empty after trimming.
+///
+/// # Arguments
+/// * `values` - Slice of (value, field_name) pairs to validate.
+///
+/// # Returns
+/// `Ok(())` if all valid, `Err` if any is empty.
+fn validate_non_empty_multi(values: &[(&str, &str)]) -> Result<()> {
+    for (value, field_name) in values {
+        validate_non_empty(value, field_name)?;
+    }
+    Ok(())
+}
+
 /// REST API wrapper for Paimon catalog operations.
 ///
-/// This struct provides methods for database CRUD operations
+/// This struct provides methods for database and table CRUD operations
 /// through a REST API client.
 pub struct RESTApi {
     client: HttpClient,
@@ -48,6 +87,8 @@ impl RESTApi {
     pub const MAX_RESULTS: &'static str = "maxResults";
     pub const PAGE_TOKEN: &'static str = "pageToken";
     pub const DATABASE_NAME_PATTERN: &'static str = "databaseNamePattern";
+    pub const TABLE_NAME_PATTERN: &'static str = "tableNamePattern";
+    pub const TABLE_TYPE: &'static str = "tableType";
 
     /// Create a new RESTApi from options.
     ///
@@ -99,7 +140,7 @@ impl RESTApi {
                 RESTUtil::encode_string(warehouse),
             )];
             let config_response: ConfigResponse = client
-                .get_with_params(&ResourcePaths::config(), &query_params)
+                .get(&ResourcePaths::config(), Some(&query_params))
                 .await?;
 
             // Merge config response with options (client config takes 
priority)
@@ -130,7 +171,7 @@ impl RESTApi {
     // ==================== Database Operations ====================
 
     /// List all databases.
-    pub async fn list_databases(&self) -> Result<Vec<String>> {
+    pub async fn list_databases(&mut self) -> Result<Vec<String>> {
         let mut results = Vec::new();
         let mut page_token: Option<String> = None;
 
@@ -151,7 +192,7 @@ impl RESTApi {
 
     /// List databases with pagination.
     pub async fn list_databases_paged(
-        &self,
+        &mut self,
         max_results: Option<u32>,
         page_token: Option<&str>,
         database_name_pattern: Option<&str>,
@@ -172,11 +213,166 @@ impl RESTApi {
         }
 
         let response: ListDatabasesResponse = if params.is_empty() {
-            self.client.get(&path).await?
+            self.client.get(&path, None::<&[(&str, &str)]>).await?
         } else {
-            self.client.get_with_params(&path, &params).await?
+            self.client.get(&path, Some(&params)).await?
         };
 
         Ok(PagedList::new(response.databases, response.next_page_token))
     }
+
+    /// Create a new database.
+    pub async fn create_database(
+        &mut self,
+        name: &str,
+        options: Option<std::collections::HashMap<String, String>>,
+    ) -> Result<()> {
+        validate_non_empty(name, "database name")?;
+        let path = self.resource_paths.databases();
+        let request = CreateDatabaseRequest::new(name.to_string(), 
options.unwrap_or_default());
+        let _resp: serde_json::Value = self.client.post(&path, 
&request).await?;
+        Ok(())
+    }
+
+    /// Get database information.
+    pub async fn get_database(&mut self, name: &str) -> 
Result<GetDatabaseResponse> {
+        validate_non_empty(name, "database name")?;
+        let path = self.resource_paths.database(name);
+        self.client.get(&path, None::<&[(&str, &str)]>).await
+    }
+
+    /// Alter database configuration.
+    pub async fn alter_database(
+        &mut self,
+        name: &str,
+        removals: Vec<String>,
+        updates: std::collections::HashMap<String, String>,
+    ) -> Result<()> {
+        validate_non_empty(name, "database name")?;
+        let path = self.resource_paths.database(name);
+        let request = AlterDatabaseRequest::new(removals, updates);
+        let _resp: serde_json::Value = self.client.post(&path, 
&request).await?;
+        Ok(())
+    }
+
+    /// Drop a database.
+    pub async fn drop_database(&mut self, name: &str) -> Result<()> {
+        validate_non_empty(name, "database name")?;
+        let path = self.resource_paths.database(name);
+        let _resp: serde_json::Value = self.client.delete(&path, 
None::<&[(&str, &str)]>).await?;
+        Ok(())
+    }
+
+    // ==================== Table Operations ====================
+
+    /// List all tables in a database.
+    pub async fn list_tables(&mut self, database: &str) -> Result<Vec<String>> 
{
+        validate_non_empty(database, "database name")?;
+
+        let mut results = Vec::new();
+        let mut page_token: Option<String> = None;
+
+        loop {
+            let paged = self
+                .list_tables_paged(database, None, page_token.as_deref(), 
None, None)
+                .await?;
+            let is_empty = paged.elements.is_empty();
+            results.extend(paged.elements);
+            page_token = paged.next_page_token;
+            if page_token.is_none() || is_empty {
+                break;
+            }
+        }
+
+        Ok(results)
+    }
+
+    /// List tables with pagination.
+    pub async fn list_tables_paged(
+        &mut self,
+        database: &str,
+        max_results: Option<u32>,
+        page_token: Option<&str>,
+        table_name_pattern: Option<&str>,
+        table_type: Option<&str>,
+    ) -> Result<PagedList<String>> {
+        validate_non_empty(database, "database name")?;
+        let path = self.resource_paths.tables(Some(database));
+        let mut params: Vec<(&str, String)> = Vec::new();
+
+        if let Some(max) = max_results {
+            params.push((Self::MAX_RESULTS, max.to_string()));
+        }
+
+        if let Some(token) = page_token {
+            params.push((Self::PAGE_TOKEN, token.to_string()));
+        }
+
+        if let Some(pattern) = table_name_pattern {
+            params.push((Self::TABLE_NAME_PATTERN, pattern.to_string()));
+        }
+
+        if let Some(ttype) = table_type {
+            params.push((Self::TABLE_TYPE, ttype.to_string()));
+        }
+
+        let response: ListTablesResponse = if params.is_empty() {
+            self.client.get(&path, None::<&[(&str, &str)]>).await?
+        } else {
+            self.client.get(&path, Some(&params)).await?
+        };
+
+        Ok(PagedList::new(
+            response.tables.unwrap_or_default(),
+            response.next_page_token,
+        ))
+    }
+
+    /// Create a new table.
+    pub async fn create_table(&mut self, identifier: &Identifier, schema: 
Schema) -> Result<()> {
+        let database = identifier.database();
+        let table = identifier.object();
+        validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
+        let path = self.resource_paths.tables(Some(database));
+        let request = CreateTableRequest::new(identifier.clone(), schema);
+        let _resp: serde_json::Value = self.client.post(&path, 
&request).await?;
+        Ok(())
+    }
+
+    /// Get table information.
+    pub async fn get_table(&mut self, identifier: &Identifier) -> 
Result<GetTableResponse> {
+        let database = identifier.database();
+        let table = identifier.object();
+        validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
+        let path = self.resource_paths.table(database, table);
+        self.client.get(&path, None::<&[(&str, &str)]>).await
+    }
+
+    /// Rename a table.
+    pub async fn rename_table(
+        &mut self,
+        source: &Identifier,
+        destination: &Identifier,
+    ) -> Result<()> {
+        validate_non_empty_multi(&[
+            (source.database(), "source database name"),
+            (source.object(), "source table name"),
+            (destination.database(), "destination database name"),
+            (destination.object(), "destination table name"),
+        ])?;
+        let path = self.resource_paths.rename_table();
+        let request = RenameTableRequest::new(source.clone(), 
destination.clone());
+        let _resp: serde_json::Value = self.client.post(&path, 
&request).await?;
+        Ok(())
+    }
+
+    /// Drop a table.
+    pub async fn drop_table(&mut self, identifier: &Identifier) -> Result<()> {
+        let database = identifier.database();
+        let table = identifier.object();
+        validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
+        let path = self.resource_paths.table(database, table);
+        let _resp: serde_json::Value = self.client.delete(&path, 
None::<&[(&str, &str)]>).await?;
+        Ok(())
+    }
 }
diff --git a/crates/paimon/src/api/rest_client.rs 
b/crates/paimon/src/api/rest_client.rs
index b348918..76db48f 100644
--- a/crates/paimon/src/api/rest_client.rs
+++ b/crates/paimon/src/api/rest_client.rs
@@ -85,17 +85,40 @@ impl HttpClient {
         Ok(normalized_url.trim_end_matches('/').to_string())
     }
 
-    /// Perform a GET request and parse the response as JSON.
+    /// Perform a GET request with optional query parameters.
     ///
     /// # Arguments
     /// * `path` - The path to append to the base URL.
+    /// * `params` - Optional query parameters as key-value pairs.
     ///
     /// # Returns
     /// The parsed JSON response.
-    pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
+    pub async fn get<T: DeserializeOwned>(
+        &mut self,
+        path: &str,
+        params: Option<&[(impl AsRef<str>, impl AsRef<str>)]>,
+    ) -> Result<T> {
         let url = self.request_url(path);
-        let headers = self.build_auth_headers("GET", path, None, 
HashMap::new());
-        let request = self.client.get(&url);
+
+        let params_map: HashMap<String, String> = match params {
+            Some(p) => p
+                .iter()
+                .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
+                .collect(),
+            None => HashMap::new(),
+        };
+
+        let headers = self
+            .build_auth_headers("GET", path, None, params_map)
+            .await?;
+
+        let mut request = self.client.get(&url);
+        if let Some(p) = params {
+            for (key, value) in p {
+                request = request.query(&[(key.as_ref(), value.as_ref())]);
+            }
+        }
+
         let request = Self::apply_headers(request, &headers);
         let resp = request.send().await.map_err(|e| Error::UnexpectedError {
             message: "http get failed".to_string(),
@@ -104,34 +127,70 @@ impl HttpClient {
         self.parse_response(resp).await
     }
 
-    /// Perform a GET request with query parameters.
+    /// Perform a POST request with a JSON body.
     ///
     /// # Arguments
     /// * `path` - The path to append to the base URL.
-    /// * `params` - Query parameters as key-value pairs (supports both `&str` 
and `String`).
+    /// * `body` - The JSON body to send.
     ///
     /// # Returns
     /// The parsed JSON response.
-    pub async fn get_with_params<T: DeserializeOwned>(
-        &self,
+    pub async fn post<T: DeserializeOwned, B: serde::Serialize>(
+        &mut self,
         path: &str,
-        params: &[(impl AsRef<str>, impl AsRef<str>)],
+        body: &B,
     ) -> Result<T> {
         let url = self.request_url(path);
-        let params_map: HashMap<String, String> = params
-            .iter()
-            .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
-            .collect();
-        let headers = self.build_auth_headers("GET", path, None, 
params_map.clone());
+        let body_str = serde_json::to_string(body).ok();
+        let headers = self
+            .build_auth_headers("POST", path, body_str.as_deref(), 
HashMap::new())
+            .await?;
+        let request = self.client.post(&url).json(body);
+        let request = Self::apply_headers(request, &headers);
+        let resp = request.send().await.map_err(|e| Error::UnexpectedError {
+            message: "http post failed".to_string(),
+            source: Some(Box::new(e)),
+        })?;
+        self.parse_response(resp).await
+    }
 
-        let mut request = self.client.get(&url);
-        for (key, value) in params {
-            request = request.query(&[(key.as_ref(), value.as_ref())]);
+    /// Perform a DELETE request with optional query parameters.
+    ///
+    /// # Arguments
+    /// * `path` - The path to append to the base URL.
+    /// * `params` - Optional query parameters as key-value pairs.
+    ///
+    /// # Returns
+    /// The parsed JSON response.
+    pub async fn delete<T: DeserializeOwned>(
+        &mut self,
+        path: &str,
+        params: Option<&[(impl AsRef<str>, impl AsRef<str>)]>,
+    ) -> Result<T> {
+        let url = self.request_url(path);
+
+        let params_map: HashMap<String, String> = match params {
+            Some(p) => p
+                .iter()
+                .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
+                .collect(),
+            None => HashMap::new(),
+        };
+
+        let headers = self
+            .build_auth_headers("DELETE", path, None, params_map)
+            .await?;
+
+        let mut request = self.client.delete(&url);
+        if let Some(p) = params {
+            for (key, value) in p {
+                request = request.query(&[(key.as_ref(), value.as_ref())]);
+            }
         }
 
         let request = Self::apply_headers(request, &headers);
         let resp = request.send().await.map_err(|e| Error::UnexpectedError {
-            message: "http get failed".to_string(),
+            message: "http delete failed".to_string(),
             source: Some(Box::new(e)),
         })?;
         self.parse_response(resp).await
@@ -143,19 +202,19 @@ impl HttpClient {
     }
 
     /// Build auth headers for a request.
-    fn build_auth_headers(
-        &self,
+    async fn build_auth_headers(
+        &mut self,
         method: &str,
         path: &str,
         data: Option<&str>,
         params: HashMap<String, String>,
-    ) -> HashMap<String, String> {
-        if let Some(ref auth_fn) = self.auth_function {
+    ) -> Result<HashMap<String, String>> {
+        if let Some(ref mut auth_fn) = self.auth_function {
             let parameter =
                 RESTAuthParameter::new(method, path, data.map(|s| 
s.to_string()), params);
-            auth_fn.apply(&parameter)
+            auth_fn.apply(&parameter).await
         } else {
-            HashMap::new()
+            Ok(HashMap::new())
         }
     }
 
@@ -203,6 +262,14 @@ impl HttpClient {
             source: Some(Box::new(e)),
         })?;
 
+        // Handle empty response body - return null as default for types like 
serde_json::Value
+        if text.trim().is_empty() {
+            return serde_json::from_str("null").map_err(|e| 
Error::UnexpectedError {
+                message: "failed to parse empty response".to_string(),
+                source: Some(Box::new(e)),
+            });
+        }
+
         serde_json::from_str(&text).map_err(|e| Error::UnexpectedError {
             message: "failed to parse json".to_string(),
             source: Some(Box::new(e)),
diff --git a/crates/paimon/src/common/options.rs 
b/crates/paimon/src/common/options.rs
index a6adf69..a469a07 100644
--- a/crates/paimon/src/common/options.rs
+++ b/crates/paimon/src/common/options.rs
@@ -38,8 +38,37 @@ impl CatalogOptions {
     /// Authentication token.
     pub const TOKEN: &'static str = "token";
 
+    /// Data token enabled flag.
+    pub const DATA_TOKEN_ENABLED: &'static str = "data-token.enabled";
+
     /// Prefix for catalog resources.
     pub const PREFIX: &'static str = "prefix";
+
+    // DLF (Data Lake Formation) configuration options
+
+    /// DLF region.
+    pub const DLF_REGION: &'static str = "dlf.region";
+
+    /// DLF access key ID.
+    pub const DLF_ACCESS_KEY_ID: &'static str = "dlf.access-key-id";
+
+    /// DLF access key secret.
+    pub const DLF_ACCESS_KEY_SECRET: &'static str = "dlf.access-key-secret";
+
+    /// DLF security token (optional, for temporary credentials).
+    pub const DLF_ACCESS_SECURITY_TOKEN: &'static str = "dlf.security-token";
+
+    /// DLF signing algorithm (default or openapi).
+    pub const DLF_SIGNING_ALGORITHM: &'static str = "dlf.signing-algorithm";
+
+    /// DLF token loader type (e.g., "ecs").
+    pub const DLF_TOKEN_LOADER: &'static str = "dlf.token-loader";
+
+    /// DLF ECS metadata URL.
+    pub const DLF_TOKEN_ECS_METADATA_URL: &'static str = 
"dlf.token-ecs-metadata-url";
+
+    /// DLF ECS role name.
+    pub const DLF_TOKEN_ECS_ROLE_NAME: &'static str = 
"dlf.token-ecs-role-name";
 }
 
 /// Configuration options container.
diff --git a/crates/paimon/tests/mock_server.rs 
b/crates/paimon/tests/mock_server.rs
index 32ced4d..fee16c7 100644
--- a/crates/paimon/tests/mock_server.rs
+++ b/crates/paimon/tests/mock_server.rs
@@ -21,22 +21,33 @@
 //! for testing purposes.
 
 use axum::{
-    extract::{Extension, Json, Query},
+    extract::{Extension, Json, Path, Query},
     http::StatusCode,
     response::IntoResponse,
     routing::get,
     Router,
 };
-use std::collections::HashMap;
+use serde_json::json;
+use std::collections::{HashMap, HashSet};
 use std::net::SocketAddr;
 use std::sync::{Arc, Mutex};
 use tokio::task::JoinHandle;
 
-use paimon::api::{ConfigResponse, ErrorResponse, ListDatabasesResponse, 
ResourcePaths};
+use paimon::api::{
+    AlterDatabaseRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, 
GetDatabaseResponse,
+    GetTableResponse, ListDatabasesResponse, ListTablesResponse, 
RenameTableRequest, ResourcePaths,
+};
 
 #[derive(Clone, Debug, Default)]
 struct MockState {
-    databases: HashMap<String, ()>,
+    databases: HashMap<String, GetDatabaseResponse>,
+    tables: HashMap<String, GetTableResponse>,
+    no_permission_databases: HashSet<String>,
+    no_permission_tables: HashSet<String>,
+    /// ECS metadata role name (for token loader testing)
+    ecs_role_name: Option<String>,
+    /// ECS metadata token (for token loader testing)
+    ecs_token: Option<serde_json::Value>,
 }
 
 #[derive(Clone)]
@@ -52,7 +63,7 @@ pub struct RESTServer {
 }
 
 impl RESTServer {
-    /// Create a new RESTServer with initial databases (backward 
compatibility).
+    /// Create a new RESTServer with initial databases.
     pub fn new(
         warehouse: String,
         data_path: String,
@@ -62,14 +73,28 @@ impl RESTServer {
         let prefix = 
config.defaults.get("prefix").cloned().unwrap_or_default();
 
         // Create database set for initial databases
-        let databases: HashMap<String, ()> =
-            initial_dbs.into_iter().map(|name| (name, ())).collect();
+        let databases: HashMap<String, GetDatabaseResponse> = initial_dbs
+            .into_iter()
+            .map(|name| {
+                let response = GetDatabaseResponse::new(
+                    Some(name.clone()),
+                    Some(name.clone()),
+                    None,
+                    HashMap::new(),
+                    AuditRESTResponse::new(None, None, None, None, None),
+                );
+                (name, response)
+            })
+            .collect();
 
         RESTServer {
             data_path,
             config,
             warehouse,
-            inner: Arc::new(Mutex::new(MockState { databases })),
+            inner: Arc::new(Mutex::new(MockState {
+                databases,
+                ..Default::default()
+            })),
             resource_paths: ResourcePaths::new(&prefix),
             addr: None,
             server_handle: None,
@@ -90,7 +115,7 @@ impl RESTServer {
                 let err = ErrorResponse::new(
                     None,
                     None,
-                    Some(format!("Warehouse {warehouse} not found")),
+                    Some(format!("Warehouse {} not found", warehouse)),
                     Some(404),
                 );
                 return (StatusCode::NOT_FOUND, Json(err)).into_response();
@@ -107,37 +132,516 @@ impl RESTServer {
         let response = ListDatabasesResponse::new(dbs, None);
         (StatusCode::OK, Json(response))
     }
+    /// Handle POST /databases - create a new database.
+    pub async fn create_database(
+        Extension(state): Extension<Arc<RESTServer>>,
+        Json(payload): Json<serde_json::Value>,
+    ) -> impl IntoResponse {
+        let name = match payload.get("name").and_then(|n| n.as_str()) {
+            Some(n) => n.to_string(),
+            None => {
+                let err =
+                    ErrorResponse::new(None, None, Some("Missing 
name".to_string()), Some(400));
+                return (StatusCode::BAD_REQUEST, Json(err)).into_response();
+            }
+        };
 
-    // ====================== Server Control ====================
-    /// Get the warehouse path.
-    #[allow(dead_code)]
-    pub fn warehouse(&self) -> &str {
-        &self.warehouse
+        let mut s = state.inner.lock().unwrap();
+        if let std::collections::hash_map::Entry::Vacant(e) = 
s.databases.entry(name.clone()) {
+            let response = GetDatabaseResponse::new(
+                Some(name.clone()),
+                Some(name.clone()),
+                None,
+                HashMap::new(),
+                AuditRESTResponse::new(None, None, None, None, None),
+            );
+            e.insert(response);
+            (StatusCode::OK, Json(serde_json::json!(""))).into_response()
+        } else {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name),
+                Some("Already Exists".to_string()),
+                Some(409),
+            );
+            (StatusCode::CONFLICT, Json(err)).into_response()
+        }
     }
+    /// Handle GET /databases/:name - get a specific database.
+    pub async fn get_database(
+        Path(name): Path<String>,
+        Extension(state): Extension<Arc<RESTServer>>,
+    ) -> impl IntoResponse {
+        let s = state.inner.lock().unwrap();
 
-    /// Get the resource paths.
-    pub fn resource_paths(&self) -> &ResourcePaths {
-        &self.resource_paths
+        if s.no_permission_databases.contains(&name) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name.clone()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        if let Some(response) = s.databases.get(&name) {
+            (StatusCode::OK, Json(response.clone())).into_response()
+        } else {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name.clone()),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            (StatusCode::NOT_FOUND, Json(err)).into_response()
+        }
+    }
+
+    /// Handle POST /databases/:name - alter database configuration.
+    pub async fn alter_database(
+        Path(name): Path<String>,
+        Extension(state): Extension<Arc<RESTServer>>,
+        Json(request): Json<AlterDatabaseRequest>,
+    ) -> impl IntoResponse {
+        let mut s = state.inner.lock().unwrap();
+
+        if s.no_permission_databases.contains(&name) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name.clone()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        if let Some(response) = s.databases.get_mut(&name) {
+            // Apply removals
+            for key in &request.removals {
+                response.options.remove(key);
+            }
+            // Apply updates
+            response.options.extend(request.updates);
+            (StatusCode::OK, Json(serde_json::json!(""))).into_response()
+        } else {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name.clone()),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            (StatusCode::NOT_FOUND, Json(err)).into_response()
+        }
+    }
+
+    /// Handle DELETE /databases/:name - drop a database.
+    pub async fn drop_database(
+        Path(name): Path<String>,
+        Extension(state): Extension<Arc<RESTServer>>,
+    ) -> impl IntoResponse {
+        let mut s = state.inner.lock().unwrap();
+
+        if s.no_permission_databases.contains(&name) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name.clone()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        if s.databases.remove(&name).is_some() {
+            // Also remove all tables in this database
+            let prefix = format!("{}.", name);
+            s.tables.retain(|key, _| !key.starts_with(&prefix));
+            s.no_permission_tables
+                .retain(|key| !key.starts_with(&prefix));
+            (StatusCode::OK, Json(serde_json::json!(""))).into_response()
+        } else {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(name.clone()),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            (StatusCode::NOT_FOUND, Json(err)).into_response()
+        }
+    }
+
+    /// Handle GET /databases/:db/tables - list all tables in a database.
+    pub async fn list_tables(
+        Path(db): Path<String>,
+        Extension(state): Extension<Arc<RESTServer>>,
+    ) -> impl IntoResponse {
+        let s = state.inner.lock().unwrap();
+
+        if s.no_permission_databases.contains(&db) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(db.clone()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        if !s.databases.contains_key(&db) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(db.clone()),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            return (StatusCode::NOT_FOUND, Json(err)).into_response();
+        }
+
+        let prefix = format!("{}.", db);
+        let mut tables: Vec<String> = s
+            .tables
+            .keys()
+            .filter_map(|key| {
+                if key.starts_with(&prefix) {
+                    Some(key[prefix.len()..].to_string())
+                } else {
+                    None
+                }
+            })
+            .collect();
+        tables.sort();
+
+        let response = ListTablesResponse::new(Some(tables), None);
+        (StatusCode::OK, Json(response)).into_response()
+    }
+
+    /// Handle POST /databases/:db/tables - create a new table.
+    pub async fn create_table(
+        Path(db): Path<String>,
+        Extension(state): Extension<Arc<RESTServer>>,
+        Json(payload): Json<serde_json::Value>,
+    ) -> impl IntoResponse {
+        // Extract table name from payload
+        let table_name = payload
+            .get("identifier")
+            .and_then(|id| id.get("object"))
+            .and_then(|o| o.as_str())
+            .map(|s| s.to_string());
+
+        let table_name = match table_name {
+            Some(name) => name,
+            None => {
+                let err = ErrorResponse::new(
+                    None,
+                    None,
+                    Some("Missing table name in identifier".to_string()),
+                    Some(400),
+                );
+                return (StatusCode::BAD_REQUEST, Json(err)).into_response();
+            }
+        };
+
+        let mut s = state.inner.lock().unwrap();
+
+        // Check database exists
+        if !s.databases.contains_key(&db) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(db.clone()),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            return (StatusCode::NOT_FOUND, Json(err)).into_response();
+        }
+
+        let key = format!("{}.{}", db, table_name);
+        if s.tables.contains_key(&key) {
+            let err = ErrorResponse::new(
+                Some("table".to_string()),
+                Some(table_name),
+                Some("Already Exists".to_string()),
+                Some(409),
+            );
+            return (StatusCode::CONFLICT, Json(err)).into_response();
+        }
+
+        // Create table response
+        let response = GetTableResponse::new(
+            Some(table_name.clone()),
+            Some(table_name),
+            None,
+            Some(true),
+            None,
+            None,
+            AuditRESTResponse::new(None, None, None, None, None),
+        );
+        s.tables.insert(key, response);
+        (StatusCode::OK, Json(serde_json::json!(""))).into_response()
+    }
+
+    /// Handle GET /databases/:db/tables/:table - get a specific table.
+    pub async fn get_table(
+        Path((db, table)): Path<(String, String)>,
+        Extension(state): Extension<Arc<RESTServer>>,
+    ) -> impl IntoResponse {
+        let s = state.inner.lock().unwrap();
+
+        let key = format!("{}.{}", db, table);
+        if s.no_permission_tables.contains(&key) {
+            let err = ErrorResponse::new(
+                Some("table".to_string()),
+                Some(table.clone()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        if let Some(response) = s.tables.get(&key) {
+            return (StatusCode::OK, Json(response.clone())).into_response();
+        }
+
+        if !s.databases.contains_key(&db) {
+            let err = ErrorResponse::new(
+                Some("database".to_string()),
+                Some(db),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            return (StatusCode::NOT_FOUND, Json(err)).into_response();
+        }
+
+        let err = ErrorResponse::new(
+            Some("table".to_string()),
+            Some(table),
+            Some("Not Found".to_string()),
+            Some(404),
+        );
+        (StatusCode::NOT_FOUND, Json(err)).into_response()
     }
 
+    /// Handle DELETE /databases/:db/tables/:table - drop a table.
+    pub async fn drop_table(
+        Path((db, table)): Path<(String, String)>,
+        Extension(state): Extension<Arc<RESTServer>>,
+    ) -> impl IntoResponse {
+        let mut s = state.inner.lock().unwrap();
+
+        let key = format!("{}.{}", db, table);
+        if s.no_permission_tables.contains(&key) {
+            let err = ErrorResponse::new(
+                Some("table".to_string()),
+                Some(table.clone()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        if s.tables.remove(&key).is_some() {
+            s.no_permission_tables.remove(&key);
+            (StatusCode::OK, Json(serde_json::json!(""))).into_response()
+        } else {
+            let err = ErrorResponse::new(
+                Some("table".to_string()),
+                Some(table),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            (StatusCode::NOT_FOUND, Json(err)).into_response()
+        }
+    }
+
+    /// Handle POST /rename-table - rename a table.
+    pub async fn rename_table(
+        Extension(state): Extension<Arc<RESTServer>>,
+        Json(request): Json<RenameTableRequest>,
+    ) -> impl IntoResponse {
+        let mut s = state.inner.lock().unwrap();
+
+        let source_key = format!("{}.{}", request.source.database(), 
request.source.object());
+        let dest_key = format!(
+            "{}.{}",
+            request.destination.database(),
+            request.destination.object()
+        );
+
+        // Check source table permission
+        if s.no_permission_tables.contains(&source_key) {
+            let err = ErrorResponse::new(
+                Some("table".to_string()),
+                Some(request.source.object().to_string()),
+                Some("No Permission".to_string()),
+                Some(403),
+            );
+            return (StatusCode::FORBIDDEN, Json(err)).into_response();
+        }
+
+        // Check if source table exists
+        if let Some(table_response) = s.tables.remove(&source_key) {
+            // Check if destination already exists
+            if s.tables.contains_key(&dest_key) {
+                // Restore source table
+                s.tables.insert(source_key, table_response);
+                let err = ErrorResponse::new(
+                    Some("table".to_string()),
+                    Some(dest_key.clone()),
+                    Some("Already Exists".to_string()),
+                    Some(409),
+                );
+                return (StatusCode::CONFLICT, Json(err)).into_response();
+            }
+
+            // Update the table name in response and insert at new location
+            let new_table_response = GetTableResponse::new(
+                Some(request.destination.object().to_string()),
+                Some(request.destination.object().to_string()),
+                table_response.path,
+                table_response.is_external,
+                table_response.schema_id,
+                table_response.schema,
+                table_response.audit,
+            );
+            s.tables.insert(dest_key.clone(), new_table_response);
+
+            // Update permission tracking if needed
+            if s.no_permission_tables.remove(&source_key) {
+                s.no_permission_tables.insert(dest_key.clone());
+            }
+
+            (StatusCode::OK, Json(serde_json::json!(""))).into_response()
+        } else {
+            let err = ErrorResponse::new(
+                Some("table".to_string()),
+                Some(source_key),
+                Some("Not Found".to_string()),
+                Some(404),
+            );
+            (StatusCode::NOT_FOUND, Json(err)).into_response()
+        }
+    }
+    // ====================== Server Control ====================
     /// Add a database to the server state.
+    #[allow(dead_code)]
     pub fn add_database(&self, name: &str) {
         let mut s = self.inner.lock().unwrap();
-        if !s.databases.contains_key(name) {
-            s.databases.insert(name.to_string(), ());
-        }
+        s.databases.entry(name.to_string()).or_insert_with(|| {
+            GetDatabaseResponse::new(
+                Some(name.to_string()),
+                Some(name.to_string()),
+                None,
+                HashMap::new(),
+                AuditRESTResponse::new(None, None, None, None, None),
+            )
+        });
+    }
+    /// Add a no-permission database to the server state.
+    #[allow(dead_code)]
+    pub fn add_no_permission_database(&self, name: &str) {
+        let mut s = self.inner.lock().unwrap();
+        s.no_permission_databases.insert(name.to_string());
     }
 
+    /// Add a table to the server state.
+    #[allow(dead_code)]
+    pub fn add_table(&self, database: &str, table: &str) {
+        let mut s = self.inner.lock().unwrap();
+        s.databases.entry(database.to_string()).or_insert_with(|| {
+            // Auto-create database if not exists
+            GetDatabaseResponse::new(
+                Some(database.to_string()),
+                Some(database.to_string()),
+                None,
+                HashMap::new(),
+                AuditRESTResponse::new(None, None, None, None, None),
+            )
+        });
+
+        let key = format!("{}.{}", database, table);
+        s.tables.entry(key).or_insert_with(|| {
+            GetTableResponse::new(
+                Some(table.to_string()),
+                Some(table.to_string()),
+                None,
+                Some(true),
+                None,
+                None,
+                AuditRESTResponse::new(None, None, None, None, None),
+            )
+        });
+    }
+
+    /// Add a no-permission table to the server state.
+    #[allow(dead_code)]
+    pub fn add_no_permission_table(&self, database: &str, table: &str) {
+        let mut s = self.inner.lock().unwrap();
+        s.no_permission_tables
+            .insert(format!("{}.{}", database, table));
+    }
     /// Get the server URL.
     pub fn url(&self) -> Option<String> {
-        self.addr.map(|a| format!("http://{a}";))
+        self.addr.map(|a| format!("http://{}";, a))
+    }
+    /// Get the warehouse path.
+    #[allow(dead_code)]
+    pub fn warehouse(&self) -> &str {
+        &self.warehouse
     }
 
+    /// Get the resource paths.
+    pub fn resource_paths(&self) -> &ResourcePaths {
+        &self.resource_paths
+    }
     /// Get the server address.
     #[allow(dead_code)]
     pub fn addr(&self) -> Option<SocketAddr> {
         self.addr
     }
+
+    /// Set ECS metadata role name and token for token loader testing.
+    #[allow(dead_code)]
+    pub fn set_ecs_metadata(&self, role_name: &str, token: serde_json::Value) {
+        let mut s = self.inner.lock().unwrap();
+        s.ecs_role_name = Some(role_name.to_string());
+        s.ecs_token = Some(token);
+    }
+
+    /// Handle GET /ram/security-credential/:role - ECS metadata endpoint.
+    pub async fn get_ecs_metadata(
+        Path(role): Path<String>,
+        Extension(state): Extension<Arc<RESTServer>>,
+    ) -> impl IntoResponse {
+        let s = state.inner.lock().unwrap();
+
+        // If role_name is set and matches, return the token
+        if let Some(expected_role) = &s.ecs_role_name {
+            if &role == expected_role {
+                if let Some(token) = &s.ecs_token {
+                    return (StatusCode::OK, 
Json(token.clone())).into_response();
+                }
+            }
+        }
+
+        (
+            StatusCode::NOT_FOUND,
+            Json(json!({"error": "Role not found"})),
+        )
+            .into_response()
+    }
+
+    /// Handle GET /ram/security-credential/ - ECS metadata endpoint (list 
roles).
+    pub async fn list_ecs_roles(Extension(state): Extension<Arc<RESTServer>>) 
-> impl IntoResponse {
+        let s = state.inner.lock().unwrap();
+
+        if let Some(role_name) = &s.ecs_role_name {
+            (StatusCode::OK, role_name.clone()).into_response()
+        } else {
+            (
+                StatusCode::NOT_FOUND,
+                Json(json!({"error": "No role configured"})),
+            )
+                .into_response()
+        }
+    }
 }
 
 impl Drop for RESTServer {
@@ -174,8 +678,35 @@ pub async fn start_mock_server(
         .route("/v1/config", get(RESTServer::get_config))
         // Database routes
         .route(
-            &format!("{prefix}/databases"),
-            get(RESTServer::list_databases),
+            &format!("{}/databases", prefix),
+            get(RESTServer::list_databases).post(RESTServer::create_database),
+        )
+        .route(
+            &format!("{}/databases/:name", prefix),
+            get(RESTServer::get_database)
+                .post(RESTServer::alter_database)
+                .delete(RESTServer::drop_database),
+        )
+        .route(
+            &format!("{}/databases/:db/tables", prefix),
+            get(RESTServer::list_tables).post(RESTServer::create_table),
+        )
+        .route(
+            &format!("{}/databases/:db/tables/:table", prefix),
+            get(RESTServer::get_table).delete(RESTServer::drop_table),
+        )
+        .route(
+            &format!("{}/tables/rename", prefix),
+            axum::routing::post(RESTServer::rename_table),
+        )
+        // ECS metadata endpoints (for token loader testing)
+        .route(
+            "/ram/security-credentials/",
+            get(RESTServer::list_ecs_roles),
+        )
+        .route(
+            "/ram/security-credentials/:role",
+            get(RESTServer::get_ecs_metadata),
         )
         .layer(Extension(state));
 
@@ -186,7 +717,7 @@ pub async fn start_mock_server(
 
     let server_handle = tokio::spawn(async move {
         if let Err(e) = axum::serve(listener, app.into_make_service()).await {
-            eprintln!("mock server error: {e}");
+            eprintln!("mock server error: {}", e);
         }
     });
 
diff --git a/crates/paimon/tests/rest_api_test.rs 
b/crates/paimon/tests/rest_api_test.rs
index 0932523..9bd31fc 100644
--- a/crates/paimon/tests/rest_api_test.rs
+++ b/crates/paimon/tests/rest_api_test.rs
@@ -18,20 +18,24 @@
 //! Integration tests for REST API.
 //!
 //! These tests use a mock server to verify the REST API client behavior.
+//! Both the mock server and API client run asynchronously using tokio.
 
 use std::collections::HashMap;
 
+use paimon::api::auth::{DLFECSTokenLoader, DLFToken, DLFTokenLoader};
 use paimon::api::rest_api::RESTApi;
 use paimon::api::ConfigResponse;
+use paimon::catalog::Identifier;
 use paimon::common::Options;
+use serde_json::json;
 
 mod mock_server;
 use mock_server::{start_mock_server, RESTServer};
-
 /// Helper struct to hold test resources.
 struct TestContext {
     server: RESTServer,
     api: RESTApi,
+    url: String,
 }
 
 /// Helper function to set up a test environment with a custom prefix.
@@ -45,14 +49,14 @@ async fn setup_test_server(initial_dbs: Vec<&str>) -> 
TestContext {
     let initial: Vec<String> = initial_dbs.iter().map(|s| 
s.to_string()).collect();
     // Start server with config
     let server = start_mock_server(
-        "test_warehouse".to_string(),
-        "/tmp/test_warehouse".to_string(),
+        "test_warehouse".to_string(),      // warehouse
+        "/tmp/test_warehouse".to_string(), // data_path
         config,
         initial,
     )
     .await;
     let token = "test_token";
-    let url = server.url().expect("server url");
+    let url = server.url().expect("Failed to get server URL");
     let mut options = Options::new();
     options.set("uri", &url);
     options.set("warehouse", "test_warehouse");
@@ -63,14 +67,13 @@ async fn setup_test_server(initial_dbs: Vec<&str>) -> 
TestContext {
         .await
         .expect("Failed to create RESTApi");
 
-    TestContext { server, api }
+    TestContext { server, api, url }
 }
 
 // ==================== Database Tests ====================
-
 #[tokio::test]
 async fn test_list_databases() {
-    let ctx = setup_test_server(vec!["default", "test_db1", "prod_db"]).await;
+    let mut ctx = setup_test_server(vec!["default", "test_db1", 
"prod_db"]).await;
 
     let dbs = ctx.api.list_databases().await.unwrap();
 
@@ -80,21 +83,393 @@ async fn test_list_databases() {
 }
 
 #[tokio::test]
-async fn test_list_databases_empty() {
-    let ctx = setup_test_server(vec![]).await;
+async fn test_create_database() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
 
+    // Create new database
+    let result = ctx.api.create_database("new_db", None).await;
+    assert!(result.is_ok(), "failed to create database: {:?}", result);
+
+    // Verify creation
     let dbs = ctx.api.list_databases().await.unwrap();
-    assert!(dbs.is_empty());
+    assert!(dbs.contains(&"new_db".to_string()));
+
+    // Duplicate creation should fail
+    let result = ctx.api.create_database("new_db", None).await;
+    assert!(result.is_err(), "creating duplicate database should fail");
 }
 
 #[tokio::test]
-async fn test_list_databases_add_after_creation() {
+async fn test_get_database() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    let db_resp = ctx.api.get_database("default").await.unwrap();
+    assert_eq!(db_resp.name, Some("default".to_string()));
+}
+
+#[tokio::test]
+async fn test_error_responses_status_mapping() {
     let ctx = setup_test_server(vec!["default"]).await;
 
-    // Add a new database after server creation
-    ctx.server.add_database("new_db");
+    // Add no-permission database
+    ctx.server.add_no_permission_database("secret");
+
+    // GET on no-permission database -> 403
+    // Use the prefix from config (v1/mock-test)
+    let url = format!("{}/v1/mock-test/databases/{}", ctx.url, "secret");
+    let result = reqwest::get(&url).await;
+    match result {
+        Ok(resp) => {
+            assert_eq!(resp.status(), 403);
+            let j: serde_json::Value = resp.json().await.unwrap();
+            assert_eq!(
+                j.get("resourceType").and_then(|v| v.as_str()),
+                Some("database")
+            );
+            assert_eq!(
+                j.get("resourceName").and_then(|v| v.as_str()),
+                Some("secret")
+            );
+            assert_eq!(j.get("code").and_then(|v| v.as_u64()), Some(403));
+        }
+        Err(e) => panic!("Expected 403 response, got error: {:?}", e),
+    }
 
+    // POST create existing database -> 409
+    let body = json!({"name": "default", "properties": {}});
+    let client = reqwest::Client::new();
+    let resp = client
+        .post(format!("{}/v1/mock-test/databases", ctx.url))
+        .json(&body)
+        .send()
+        .await
+        .unwrap();
+    assert_eq!(resp.status(), 409);
+
+    let j2: serde_json::Value = resp.json().await.unwrap();
+    assert_eq!(
+        j2.get("resourceType").and_then(|v| v.as_str()),
+        Some("database")
+    );
+    assert_eq!(
+        j2.get("resourceName").and_then(|v| v.as_str()),
+        Some("default")
+    );
+    assert_eq!(j2.get("code").and_then(|v| v.as_u64()), Some(409));
+}
+
+#[tokio::test]
+async fn test_alter_database() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    // Alter database with updates
+    let mut updates = HashMap::new();
+    updates.insert("key1".to_string(), "value1".to_string());
+    updates.insert("key2".to_string(), "value2".to_string());
+
+    let result = ctx.api.alter_database("default", vec![], updates).await;
+    assert!(result.is_ok(), "failed to alter database: {:?}", result);
+
+    // Verify the updates by getting the database
+    let db_resp = ctx.api.get_database("default").await.unwrap();
+    assert_eq!(db_resp.options.get("key1"), Some(&"value1".to_string()));
+    assert_eq!(db_resp.options.get("key2"), Some(&"value2".to_string()));
+
+    // Alter database with removals
+    let result = ctx
+        .api
+        .alter_database("default", vec!["key1".to_string()], HashMap::new())
+        .await;
+    assert!(result.is_ok(), "failed to remove key: {:?}", result);
+
+    let db_resp = ctx.api.get_database("default").await.unwrap();
+    assert!(!db_resp.options.contains_key("key1"));
+    assert_eq!(db_resp.options.get("key2"), Some(&"value2".to_string()));
+}
+
+#[tokio::test]
+async fn test_alter_database_not_found() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    let result = ctx
+        .api
+        .alter_database("non_existent", vec![], HashMap::new())
+        .await;
+    assert!(
+        result.is_err(),
+        "altering non-existent database should fail"
+    );
+}
+
+#[tokio::test]
+async fn test_drop_database() {
+    let mut ctx = setup_test_server(vec!["default", "to_drop"]).await;
+
+    // Verify database exists
     let dbs = ctx.api.list_databases().await.unwrap();
-    assert!(dbs.contains(&"default".to_string()));
-    assert!(dbs.contains(&"new_db".to_string()));
+    assert!(dbs.contains(&"to_drop".to_string()));
+
+    // Drop database
+    let result = ctx.api.drop_database("to_drop").await;
+    assert!(result.is_ok(), "failed to drop database: {:?}", result);
+
+    // Verify database is gone
+    let dbs = ctx.api.list_databases().await.unwrap();
+    assert!(!dbs.contains(&"to_drop".to_string()));
+
+    // Dropping non-existent database should fail
+    let result = ctx.api.drop_database("to_drop").await;
+    assert!(
+        result.is_err(),
+        "dropping non-existent database should fail"
+    );
+}
+
+#[tokio::test]
+async fn test_drop_database_no_permission() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+    ctx.server.add_no_permission_database("secret");
+
+    let result = ctx.api.drop_database("secret").await;
+    assert!(
+        result.is_err(),
+        "dropping no-permission database should fail"
+    );
+}
+// ==================== Table Tests ====================
+
+#[tokio::test]
+async fn test_list_tables_and_get_table() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    // Add tables
+    ctx.server.add_table("default", "table1");
+    ctx.server.add_table("default", "table2");
+
+    // List tables
+    let tables = ctx.api.list_tables("default").await.unwrap();
+    assert!(tables.contains(&"table1".to_string()));
+    assert!(tables.contains(&"table2".to_string()));
+
+    // Get table
+    let table_resp = ctx
+        .api
+        .get_table(&Identifier::new("default", "table1"))
+        .await
+        .unwrap();
+    assert_eq!(table_resp.id.unwrap_or_default(), "table1");
+}
+
+#[tokio::test]
+async fn test_get_table_not_found() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    let result = ctx
+        .api
+        .get_table(&Identifier::new("default", "non_existent_table"))
+        .await;
+    assert!(result.is_err(), "getting non-existent table should fail");
+}
+
+#[tokio::test]
+async fn test_list_tables_empty_database() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    let tables = ctx.api.list_tables("default").await.unwrap();
+    assert!(
+        tables.is_empty(),
+        "expected empty tables list, got: {:?}",
+        tables
+    );
+}
+
+#[tokio::test]
+async fn test_multiple_databases_with_tables() {
+    let mut ctx = setup_test_server(vec!["db1", "db2"]).await;
+
+    // Add tables to different databases
+    ctx.server.add_table("db1", "table1_db1");
+    ctx.server.add_table("db1", "table2_db1");
+    ctx.server.add_table("db2", "table1_db2");
+
+    // Verify db1 tables
+    let tables_db1 = ctx.api.list_tables("db1").await.unwrap();
+    assert_eq!(tables_db1.len(), 2);
+    assert!(tables_db1.contains(&"table1_db1".to_string()));
+    assert!(tables_db1.contains(&"table2_db1".to_string()));
+
+    // Verify db2 tables
+    let tables_db2 = ctx.api.list_tables("db2").await.unwrap();
+    assert_eq!(tables_db2.len(), 1);
+    assert!(tables_db2.contains(&"table1_db2".to_string()));
+}
+
+#[tokio::test]
+async fn test_create_table() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    // Create a simple schema using builder
+    use paimon::spec::{DataType, Schema};
+    let schema = Schema::builder()
+        .column("id", DataType::BigInt(paimon::spec::BigIntType::new()))
+        .column(
+            "name",
+            DataType::VarChar(paimon::spec::VarCharType::new(255).unwrap()),
+        )
+        .build()
+        .expect("Failed to build schema");
+
+    let result = ctx
+        .api
+        .create_table(&Identifier::new("default", "new_table"), schema)
+        .await;
+    assert!(result.is_ok(), "failed to create table: {:?}", result);
+
+    // Verify table exists
+    let tables = ctx.api.list_tables("default").await.unwrap();
+    assert!(tables.contains(&"new_table".to_string()));
+
+    // Get the table
+    let table_resp = ctx
+        .api
+        .get_table(&Identifier::new("default", "new_table"))
+        .await
+        .unwrap();
+    assert_eq!(table_resp.name, Some("new_table".to_string()));
+}
+
+#[tokio::test]
+async fn test_drop_table() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    // Add a table
+    ctx.server.add_table("default", "table_to_drop");
+
+    // Verify table exists
+    let tables = ctx.api.list_tables("default").await.unwrap();
+    assert!(tables.contains(&"table_to_drop".to_string()));
+
+    // Drop table
+    let result = ctx
+        .api
+        .drop_table(&Identifier::new("default", "table_to_drop"))
+        .await;
+    assert!(result.is_ok(), "failed to drop table: {:?}", result);
+
+    // Verify table is gone
+    let tables = ctx.api.list_tables("default").await.unwrap();
+    assert!(!tables.contains(&"table_to_drop".to_string()));
+
+    // Dropping non-existent table should fail
+    let result = ctx
+        .api
+        .drop_table(&Identifier::new("default", "table_to_drop"))
+        .await;
+    assert!(result.is_err(), "dropping non-existent table should fail");
+}
+
+#[tokio::test]
+async fn test_drop_table_no_permission() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+    ctx.server
+        .add_no_permission_table("default", "secret_table");
+
+    let result = ctx
+        .api
+        .drop_table(&Identifier::new("default", "secret_table"))
+        .await;
+    assert!(result.is_err(), "dropping no-permission table should fail");
+}
+
+// ==================== Rename Table Tests ====================
+
+#[tokio::test]
+async fn test_rename_table() {
+    let mut ctx = setup_test_server(vec!["default"]).await;
+
+    // Add a table
+    ctx.server.add_table("default", "old_table");
+
+    // Rename table
+    let result = ctx
+        .api
+        .rename_table(
+            &Identifier::new("default", "old_table"),
+            &Identifier::new("default", "new_table"),
+        )
+        .await;
+    assert!(result.is_ok(), "failed to rename table: {:?}", result);
+
+    // Verify old table is gone
+    let tables = ctx.api.list_tables("default").await.unwrap();
+    assert!(!tables.contains(&"old_table".to_string()));
+
+    // Verify new table exists
+    assert!(tables.contains(&"new_table".to_string()));
+
+    // Get the renamed table
+    let table_resp = ctx
+        .api
+        .get_table(&Identifier::new("default", "new_table"))
+        .await
+        .unwrap();
+    assert_eq!(table_resp.name, Some("new_table".to_string()));
+}
+
+// ==================== Token Loader Tests ====================
+
+#[tokio::test]
+async fn test_ecs_loader_token() {
+    let prefix = "mock-test";
+    let mut defaults = HashMap::new();
+    defaults.insert("prefix".to_string(), prefix.to_string());
+    let config = ConfigResponse::new(defaults);
+
+    let initial: Vec<String> = vec!["default".to_string()];
+    let server = start_mock_server(
+        "test_warehouse".to_string(),
+        "/tmp/test_warehouse".to_string(),
+        config,
+        initial,
+    )
+    .await;
+
+    let role_name = "test_role";
+    let token_json = json!({
+        "AccessKeyId": "AccessKeyId",
+        "AccessKeySecret": "AccessKeySecret",
+        "SecurityToken": "AQoDYXdzEJr...<remainder of security token>",
+        "Expiration": "2023-12-01T12:00:00Z"
+    });
+
+    server.set_ecs_metadata(role_name, token_json.clone());
+
+    let ecs_metadata_url = format!("{}/ram/security-credentials/", 
server.url().unwrap());
+
+    // Test without role name
+    let loader = DLFECSTokenLoader::new(&ecs_metadata_url, None);
+    let load_token: DLFToken = loader.load_token().await.unwrap();
+
+    assert_eq!(load_token.access_key_id, "AccessKeyId");
+    assert_eq!(load_token.access_key_secret, "AccessKeySecret");
+    assert_eq!(
+        load_token.security_token,
+        Some("AQoDYXdzEJr...<remainder of security token>".to_string())
+    );
+    assert_eq!(
+        load_token.expiration,
+        Some("2023-12-01T12:00:00Z".to_string())
+    );
+
+    // Test with role name
+    let loader_with_role = DLFECSTokenLoader::new(&ecs_metadata_url, 
Some(role_name.to_string()));
+    let token: DLFToken = loader_with_role.load_token().await.unwrap();
+
+    assert_eq!(token.access_key_id, "AccessKeyId");
+    assert_eq!(token.access_key_secret, "AccessKeySecret");
+    assert_eq!(
+        token.security_token,
+        Some("AQoDYXdzEJr...<remainder of security token>".to_string())
+    );
+    assert_eq!(token.expiration, Some("2023-12-01T12:00:00Z".to_string()));
 }

Reply via email to