This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bc407713e9 [core] Support DLF auth (#4983)
bc407713e9 is described below

commit bc407713e9b60813222600982919a1285f75aeb8
Author: jerry <[email protected]>
AuthorDate: Mon Feb 24 17:54:43 2025 +0800

    [core] Support DLF auth (#4983)
---
 docs/content/concepts/rest-catalog.md              |  18 +-
 .../java/org/apache/paimon/utils/StringUtils.java  |   8 +
 .../java/org/apache/paimon/rest/HttpClient.java    |  89 +++++++--
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  89 ++++++---
 .../org/apache/paimon/rest/RESTCatalogOptions.java |  50 +++--
 .../java/org/apache/paimon/rest/RESTClient.java    |  19 +-
 .../org/apache/paimon/rest/auth/AuthProvider.java  |  39 +---
 ...okenAuthProvider.java => AuthProviderEnum.java} |  38 ++--
 ...nAuthProvider.java => AuthProviderFactory.java} |  44 ++---
 .../org/apache/paimon/rest/auth/AuthSession.java   |  46 ++---
 .../paimon/rest/auth/BearTokenAuthProvider.java    |  17 +-
 ...ider.java => BearTokenAuthProviderFactory.java} |  30 +--
 .../rest/auth/BearTokenFileAuthProvider.java       | 101 ----------
 .../apache/paimon/rest/auth/DLFAuthProvider.java   | 207 ++++++++++++++++++++
 .../paimon/rest/auth/DLFAuthProviderFactory.java   |  65 +++++++
 .../apache/paimon/rest/auth/DLFAuthSignature.java  | 206 ++++++++++++++++++++
 .../java/org/apache/paimon/rest/auth/DLFToken.java |  92 +++++++++
 ...okenAuthProvider.java => RESTAuthFunction.java} |  34 ++--
 .../apache/paimon/rest/auth/RESTAuthParameter.java |  64 ++++++
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../org/apache/paimon/rest/HttpClientTest.java     |  48 ++++-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  10 +-
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  22 +++
 .../apache/paimon/rest/auth/AuthProviderTest.java  |  82 --------
 .../apache/paimon/rest/auth/AuthSessionTest.java   | 214 +++++++++++++++++----
 .../paimon/rest/auth/DLFAuthSignatureTest.java     |  61 ++++++
 .../org/apache/paimon/flink/RESTCatalogITCase.java |   2 +
 27 files changed, 1224 insertions(+), 473 deletions(-)

diff --git a/docs/content/concepts/rest-catalog.md 
b/docs/content/concepts/rest-catalog.md
index 4645d86418..55613f5107 100644
--- a/docs/content/concepts/rest-catalog.md
+++ b/docs/content/concepts/rest-catalog.md
@@ -70,7 +70,8 @@ WITH (
 'dlf.accessKeySecret'='<accessKeySecret>',
 );
 ```
-- DLF token path
+
+- DLF sts token
 ```sql
 CREATE CATALOG `paimon-rest-catalog`
 WITH (
@@ -78,7 +79,20 @@ WITH (
 'uri' = '<catalog server url>',
 'metastore' = 'rest',
 'token.provider' = 'dlf',
-'dlf.token-path' = '<token-path>'
+'dlf.accessKeyId'='<accessKeyId>',
+'dlf.accessKeySecret'='<accessKeySecret>',
+'dlf.securityToken'='<securityToken>'
+);
+```
+
+- DLF sts token path
+```sql
+CREATE CATALOG `paimon-rest-catalog`
+WITH (
+'type' = 'paimon',
+'uri' = '<catalog server url>',
+'metastore' = 'rest',
+'token.provider' = 'dlf'
 );
 ```
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index c4e07e0a69..5aa4e65f4c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -558,4 +558,12 @@ public class StringUtils {
         }
         return true;
     }
+
+    // A null-safe trim method.
+    public static String trim(String value) {
+        if (value == null) {
+            return null;
+        }
+        return value.trim();
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 5a13a51ef7..8aa3da86d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -20,8 +20,11 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.RESTException;
 import org.apache.paimon.rest.responses.ErrorResponse;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -36,12 +39,17 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 
 import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static okhttp3.ConnectionSpec.CLEARTEXT;
 import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
@@ -82,32 +90,38 @@ public class HttpClient implements RESTClient {
 
     @Override
     public <T extends RESTResponse> T get(
-            String path, Class<T> responseType, Map<String, String> headers) {
+            String path, Class<T> responseType, RESTAuthFunction 
restAuthFunction) {
+        Map<String, String> authHeaders = getHeaders(path, "GET", "", 
restAuthFunction);
         Request request =
                 new Request.Builder()
                         .url(getRequestUrl(path))
                         .get()
-                        .headers(Headers.of(headers))
+                        .headers(Headers.of(authHeaders))
                         .build();
         return exec(request, responseType);
     }
 
     @Override
     public <T extends RESTResponse> T post(
-            String path, RESTRequest body, Map<String, String> headers) {
-        return post(path, body, null, headers);
+            String path, RESTRequest body, RESTAuthFunction restAuthFunction) {
+        return post(path, body, null, restAuthFunction);
     }
 
     @Override
     public <T extends RESTResponse> T post(
-            String path, RESTRequest body, Class<T> responseType, Map<String, 
String> headers) {
+            String path,
+            RESTRequest body,
+            Class<T> responseType,
+            RESTAuthFunction restAuthFunction) {
         try {
-            RequestBody requestBody = buildRequestBody(body);
+            String bodyStr = OBJECT_MAPPER.writeValueAsString(body);
+            Map<String, String> authHeaders = getHeaders(path, "POST", 
bodyStr, restAuthFunction);
+            RequestBody requestBody = buildRequestBody(bodyStr);
             Request request =
                     new Request.Builder()
                             .url(getRequestUrl(path))
                             .post(requestBody)
-                            .headers(Headers.of(headers))
+                            .headers(Headers.of(authHeaders))
                             .build();
             return exec(request, responseType);
         } catch (JsonProcessingException e) {
@@ -116,26 +130,29 @@ public class HttpClient implements RESTClient {
     }
 
     @Override
-    public <T extends RESTResponse> T delete(String path, Map<String, String> 
headers) {
+    public <T extends RESTResponse> T delete(String path, RESTAuthFunction 
restAuthFunction) {
+        Map<String, String> authHeaders = getHeaders(path, "DELETE", "", 
restAuthFunction);
         Request request =
                 new Request.Builder()
                         .url(getRequestUrl(path))
                         .delete()
-                        .headers(Headers.of(headers))
+                        .headers(Headers.of(authHeaders))
                         .build();
         return exec(request, null);
     }
 
     @Override
     public <T extends RESTResponse> T delete(
-            String path, RESTRequest body, Map<String, String> headers) {
+            String path, RESTRequest body, RESTAuthFunction restAuthFunction) {
         try {
-            RequestBody requestBody = buildRequestBody(body);
+            String bodyStr = OBJECT_MAPPER.writeValueAsString(body);
+            Map<String, String> authHeaders = getHeaders(path, "DELETE", 
bodyStr, restAuthFunction);
+            RequestBody requestBody = buildRequestBody(bodyStr);
             Request request =
                     new Request.Builder()
                             .url(getRequestUrl(path))
                             .delete(requestBody)
-                            .headers(Headers.of(headers))
+                            .headers(Headers.of(authHeaders))
                             .build();
             return exec(request, null);
         } catch (JsonProcessingException e) {
@@ -182,12 +199,8 @@ public class HttpClient implements RESTClient {
         }
     }
 
-    private RequestBody buildRequestBody(RESTRequest body) throws 
JsonProcessingException {
-        return RequestBody.create(OBJECT_MAPPER.writeValueAsBytes(body), 
MEDIA_TYPE);
-    }
-
-    private String getRequestUrl(String path) {
-        return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
+    private static RequestBody buildRequestBody(String body) throws 
JsonProcessingException {
+        return RequestBody.create(body.getBytes(StandardCharsets.UTF_8), 
MEDIA_TYPE);
     }
 
     private static OkHttpClient createHttpClient(HttpClientOptions 
httpClientOptions) {
@@ -221,4 +234,44 @@ public class HttpClient implements RESTClient {
 
         return builder.build();
     }
+
+    private String getRequestUrl(String path) {
+        return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
+    }
+
+    private Map<String, String> getHeaders(
+            String path,
+            String method,
+            String data,
+            Function<RESTAuthParameter, Map<String, String>> headerFunction) {
+        Pair<String, Map<String, String>> resourcePath2Parameters = 
parsePath(path);
+        RESTAuthParameter restAuthParameter =
+                new RESTAuthParameter(
+                        URI.create(uri).getHost(),
+                        resourcePath2Parameters.getLeft(),
+                        resourcePath2Parameters.getValue(),
+                        method,
+                        data);
+        return headerFunction.apply(restAuthParameter);
+    }
+
+    @VisibleForTesting
+    protected static Pair<String, Map<String, String>> parsePath(String path) {
+        String[] paths = path.split("\\?");
+        String resourcePath = paths[0];
+        if (paths.length == 1) {
+            return Pair.of(resourcePath, Collections.emptyMap());
+        }
+        String query = paths[1];
+        Map<String, String> parameters =
+                Arrays.stream(query.split("&"))
+                        .map(pair -> pair.split("=", 2))
+                        .collect(
+                                Collectors.toMap(
+                                        pair -> pair[0].trim(), // key
+                                        pair -> pair[1].trim(), // value
+                                        (existing, replacement) -> existing // 
handle duplicates
+                                        ));
+        return Pair.of(resourcePath, parameters);
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 6469859e80..d7c5990aa6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -32,6 +32,8 @@ import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.auth.AuthSession;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
 import org.apache.paimon.rest.exceptions.BadRequestException;
 import org.apache.paimon.rest.exceptions.ForbiddenException;
@@ -108,6 +110,7 @@ public class RESTCatalog implements Catalog {
     private final CatalogContext context;
     private final boolean dataTokenEnabled;
     private final FileIO fileIO;
+    private RESTAuthFunction restAuthFunction;
 
     private volatile ScheduledExecutorService refreshExecutor = null;
 
@@ -118,23 +121,25 @@ public class RESTCatalog implements Catalog {
     public RESTCatalog(CatalogContext context, boolean configRequired) {
         this.client = new HttpClient(context.options());
         this.catalogAuth = createAuthSession(context.options(), 
tokenRefreshExecutor());
-
         Options options = context.options();
+        Map<String, String> baseHeaders = Collections.emptyMap();
         if (configRequired) {
             if (context.options().contains(WAREHOUSE)) {
                 throw new IllegalArgumentException("Can not config warehouse 
in RESTCatalog.");
             }
 
-            Map<String, String> initHeaders =
-                    RESTUtil.merge(
-                            extractPrefixMap(context.options(), HEADER_PREFIX),
-                            catalogAuth.getHeaders());
+            baseHeaders = extractPrefixMap(context.options(), HEADER_PREFIX);
             options =
                     new Options(
-                            client.get(ResourcePaths.V1_CONFIG, 
ConfigResponse.class, initHeaders)
+                            client.get(
+                                            ResourcePaths.V1_CONFIG,
+                                            ConfigResponse.class,
+                                            new RESTAuthFunction(
+                                                    Collections.emptyMap(), 
catalogAuth))
                                     .merge(context.options().toMap()));
+            baseHeaders.putAll(extractPrefixMap(options, HEADER_PREFIX));
         }
-
+        this.restAuthFunction = new RESTAuthFunction(baseHeaders, catalogAuth);
         context = CatalogContext.create(options, context.preferIO(), 
context.fallbackIO());
         this.context = context;
         this.resourcePaths = ResourcePaths.forCatalogProperties(options);
@@ -156,7 +161,8 @@ public class RESTCatalog implements Catalog {
     @Override
     public List<String> listDatabases() {
         ListDatabasesResponse response =
-                client.get(resourcePaths.databases(), 
ListDatabasesResponse.class, headers());
+                client.get(
+                        resourcePaths.databases(), 
ListDatabasesResponse.class, restAuthFunction);
         if (response.getDatabases() != null) {
             return response.getDatabases();
         }
@@ -170,7 +176,10 @@ public class RESTCatalog implements Catalog {
         CreateDatabaseRequest request = new CreateDatabaseRequest(name, 
properties);
         try {
             client.post(
-                    resourcePaths.databases(), request, 
CreateDatabaseResponse.class, headers());
+                    resourcePaths.databases(),
+                    request,
+                    CreateDatabaseResponse.class,
+                    restAuthFunction);
         } catch (AlreadyExistsException e) {
             if (!ignoreIfExists) {
                 throw new DatabaseAlreadyExistException(name);
@@ -187,7 +196,10 @@ public class RESTCatalog implements Catalog {
         }
         try {
             GetDatabaseResponse response =
-                    client.get(resourcePaths.database(name), 
GetDatabaseResponse.class, headers());
+                    client.get(
+                            resourcePaths.database(name),
+                            GetDatabaseResponse.class,
+                            restAuthFunction);
             return new Database.DatabaseImpl(
                     name, response.options(), response.comment().orElseGet(() 
-> null));
         } catch (NoSuchResourceException e) {
@@ -205,7 +217,7 @@ public class RESTCatalog implements Catalog {
             if (!cascade && !this.listTables(name).isEmpty()) {
                 throw new DatabaseNotEmptyException(name);
             }
-            client.delete(resourcePaths.database(name), headers());
+            client.delete(resourcePaths.database(name), restAuthFunction);
         } catch (NoSuchResourceException | DatabaseNotExistException e) {
             if (!ignoreIfNotExists) {
                 throw new DatabaseNotExistException(name);
@@ -231,7 +243,7 @@ public class RESTCatalog implements Catalog {
                             resourcePaths.databaseProperties(name),
                             request,
                             AlterDatabaseResponse.class,
-                            headers());
+                            restAuthFunction);
             if (response.getUpdated().isEmpty()) {
                 throw new IllegalStateException("Failed to update properties");
             }
@@ -251,7 +263,7 @@ public class RESTCatalog implements Catalog {
                     client.get(
                             resourcePaths.tables(databaseName),
                             ListTablesResponse.class,
-                            headers());
+                            restAuthFunction);
             if (response.getTables() != null) {
                 return response.getTables();
             }
@@ -290,7 +302,7 @@ public class RESTCatalog implements Catalog {
         return client.get(
                 resourcePaths.tableToken(identifier.getDatabaseName(), 
identifier.getObjectName()),
                 GetTableTokenResponse.class,
-                catalogAuth.getHeaders());
+                restAuthFunction);
     }
 
     public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
@@ -300,7 +312,7 @@ public class RESTCatalog implements Catalog {
                         
resourcePaths.commitTable(identifier.getDatabaseName()),
                         request,
                         CommitTableResponse.class,
-                        headers());
+                        restAuthFunction);
         return response.isSuccess();
     }
 
@@ -312,7 +324,7 @@ public class RESTCatalog implements Catalog {
                             resourcePaths.table(
                                     identifier.getDatabaseName(), 
identifier.getTableName()),
                             GetTableResponse.class,
-                            headers());
+                            restAuthFunction);
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (ForbiddenException e) {
@@ -331,7 +343,9 @@ public class RESTCatalog implements Catalog {
             checkNotSystemTable(identifier, "createTable");
             validateAutoCreateClose(schema.options());
             CreateTableRequest request = new CreateTableRequest(identifier, 
schema);
-            client.post(resourcePaths.tables(identifier.getDatabaseName()), 
request, headers());
+            client.post(
+                    resourcePaths.tables(identifier.getDatabaseName()), 
request, restAuthFunction);
+
         } catch (AlreadyExistsException e) {
             if (!ignoreIfExists) {
                 throw new TableAlreadyExistException(identifier);
@@ -356,7 +370,10 @@ public class RESTCatalog implements Catalog {
         checkNotSystemTable(toTable, "renameTable");
         try {
             RenameTableRequest request = new RenameTableRequest(fromTable, 
toTable);
-            
client.post(resourcePaths.renameTable(fromTable.getDatabaseName()), request, 
headers());
+            client.post(
+                    resourcePaths.renameTable(fromTable.getDatabaseName()),
+                    request,
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 throw new TableNotExistException(fromTable);
@@ -378,7 +395,7 @@ public class RESTCatalog implements Catalog {
             client.post(
                     resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
                     request,
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 if (e.resourceType() == ErrorResponseResourceType.TABLE) {
@@ -408,7 +425,7 @@ public class RESTCatalog implements Catalog {
         try {
             client.delete(
                     resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 throw new TableNotExistException(identifier);
@@ -427,7 +444,7 @@ public class RESTCatalog implements Catalog {
                     resourcePaths.partitions(
                             identifier.getDatabaseName(), 
identifier.getTableName()),
                     request,
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (NotImplementedException ignored) {
@@ -444,7 +461,7 @@ public class RESTCatalog implements Catalog {
                     resourcePaths.dropPartitions(
                             identifier.getDatabaseName(), 
identifier.getTableName()),
                     request,
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (NotImplementedException ignored) {
@@ -470,7 +487,7 @@ public class RESTCatalog implements Catalog {
                     resourcePaths.alterPartitions(
                             identifier.getDatabaseName(), 
identifier.getTableName()),
                     request,
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (NotImplementedException ignored) {
@@ -487,7 +504,7 @@ public class RESTCatalog implements Catalog {
                     resourcePaths.markDonePartitions(
                             identifier.getDatabaseName(), 
identifier.getTableName()),
                     request,
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (NotImplementedException ignored) {
@@ -503,7 +520,7 @@ public class RESTCatalog implements Catalog {
                             resourcePaths.partitions(
                                     identifier.getDatabaseName(), 
identifier.getTableName()),
                             ListPartitionsResponse.class,
-                            headers());
+                            restAuthFunction);
             if (response == null || response.getPartitions() == null) {
                 return Collections.emptyList();
             }
@@ -526,7 +543,7 @@ public class RESTCatalog implements Catalog {
                             resourcePaths.view(
                                     identifier.getDatabaseName(), 
identifier.getTableName()),
                             GetViewResponse.class,
-                            headers());
+                            restAuthFunction);
             ViewSchema schema = response.getSchema();
             return new ViewImpl(
                     identifier,
@@ -546,7 +563,7 @@ public class RESTCatalog implements Catalog {
         try {
             client.delete(
                     resourcePaths.view(identifier.getDatabaseName(), 
identifier.getTableName()),
-                    headers());
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 throw new ViewNotExistException(identifier);
@@ -566,7 +583,9 @@ public class RESTCatalog implements Catalog {
                             view.comment().orElse(null),
                             view.options());
             CreateViewRequest request = new CreateViewRequest(identifier, 
schema);
-            client.post(resourcePaths.views(identifier.getDatabaseName()), 
request, headers());
+            client.post(
+                    resourcePaths.views(identifier.getDatabaseName()), 
request, restAuthFunction);
+
         } catch (NoSuchResourceException e) {
             throw new DatabaseNotExistException(identifier.getDatabaseName());
         } catch (AlreadyExistsException e) {
@@ -581,7 +600,9 @@ public class RESTCatalog implements Catalog {
         try {
             ListViewsResponse response =
                     client.get(
-                            resourcePaths.views(databaseName), 
ListViewsResponse.class, headers());
+                            resourcePaths.views(databaseName),
+                            ListViewsResponse.class,
+                            restAuthFunction);
             return response.getViews();
         } catch (NoSuchResourceException e) {
             throw new DatabaseNotExistException(databaseName);
@@ -593,7 +614,11 @@ public class RESTCatalog implements Catalog {
             throws ViewNotExistException, ViewAlreadyExistException {
         try {
             RenameTableRequest request = new RenameTableRequest(fromView, 
toView);
-            client.post(resourcePaths.renameView(fromView.getDatabaseName()), 
request, headers());
+            client.post(
+                    resourcePaths.renameView(fromView.getDatabaseName()),
+                    request,
+                    restAuthFunction);
+
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 throw new ViewNotExistException(fromView);
@@ -618,8 +643,8 @@ public class RESTCatalog implements Catalog {
         }
     }
 
-    private Map<String, String> headers() {
-        return catalogAuth.getHeaders();
+    Map<String, String> headers(RESTAuthParameter restAuthParameter) {
+        return restAuthFunction.apply(restAuthParameter);
     }
 
     private ScheduledExecutorService tokenRefreshExecutor() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 35b72469bd..310e3335ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -60,23 +60,49 @@ public class RESTCatalogOptions {
             ConfigOptions.key("token")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("REST Catalog auth token.");
+                    .withDescription("REST Catalog auth bear token.");
 
-    public static final ConfigOption<Duration> TOKEN_EXPIRATION_TIME =
-            ConfigOptions.key("token.expiration-time")
+    public static final ConfigOption<Duration> TOKEN_REFRESH_TIME =
+            ConfigOptions.key("token.refresh-time")
                     .durationType()
                     .defaultValue(Duration.ofHours(1))
-                    .withDescription(
-                            "REST Catalog auth token expires time.The token 
generates system refresh frequency is t1,"
-                                    + " the token expires time is t2, we need 
to guarantee that t2 > t1,"
-                                    + " the token validity time is [t2 - t1, 
t2],"
-                                    + " and the expires time defined here 
needs to be less than (t2 - t1)");
-
-    public static final ConfigOption<String> TOKEN_PROVIDER_PATH =
-            ConfigOptions.key("token.provider.path")
+                    .withDescription("REST Catalog auth token refresh time.");
+
+    public static final ConfigOption<String> TOKEN_PROVIDER =
+            ConfigOptions.key("token.provider")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth token provider.");
+
+    public static final ConfigOption<String> DLF_TOKEN_PATH =
+            ConfigOptions.key("dlf.token-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth DLF token file path.");
+
+    public static final ConfigOption<String> DLF_ACCESS_KEY_ID =
+            ConfigOptions.key("dlf.accessKeyId")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth DLF access key id");
+
+    public static final ConfigOption<String> DLF_ACCESS_KEY_SECRET =
+            ConfigOptions.key("dlf.accessKeySecret")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth DLF access key 
secret");
+
+    public static final ConfigOption<String> DLF_SECURITY_TOKEN =
+            ConfigOptions.key("dlf.securityToken")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth DLF security token");
+
+    public static final ConfigOption<String> DLF_ROLE_SESSION_NAME =
+            ConfigOptions.key("dlf.roleSessionName")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("REST Catalog auth token provider path.");
+                    .withDescription("REST Catalog auth DLF role session 
name");
 
     public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
             ConfigOptions.key("data-token.enabled")
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
index d816f09ed0..7446836653 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
@@ -18,20 +18,27 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.rest.auth.RESTAuthFunction;
+
 import java.io.Closeable;
-import java.util.Map;
 
 /** Interface for a basic HTTP Client for interfacing with the REST catalog. */
 public interface RESTClient extends Closeable {
 
-    <T extends RESTResponse> T get(String path, Class<T> responseType, 
Map<String, String> headers);
+    <T extends RESTResponse> T get(
+            String path, Class<T> responseType, RESTAuthFunction 
restAuthFunction);
 
-    <T extends RESTResponse> T post(String path, RESTRequest body, Map<String, 
String> headers);
+    <T extends RESTResponse> T post(
+            String path, RESTRequest body, RESTAuthFunction restAuthFunction);
 
     <T extends RESTResponse> T post(
-            String path, RESTRequest body, Class<T> responseType, Map<String, 
String> headers);
+            String path,
+            RESTRequest body,
+            Class<T> responseType,
+            RESTAuthFunction restAuthFunction);
 
-    <T extends RESTResponse> T delete(String path, Map<String, String> 
headers);
+    <T extends RESTResponse> T delete(String path, RESTAuthFunction 
restAuthFunction);
 
-    <T extends RESTResponse> T delete(String path, RESTRequest body, 
Map<String, String> headers);
+    <T extends RESTResponse> T delete(
+            String path, RESTRequest body, RESTAuthFunction restAuthFunction);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
index 26c9490dc9..666337ec79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
@@ -18,27 +18,16 @@
 
 package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTCatalogOptions;
-import org.apache.paimon.utils.StringUtils;
-
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_EXPIRATION_TIME;
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER_PATH;
-
 /** Authentication provider. */
 public interface AuthProvider {
 
-    Map<String, String> authHeader();
+    Map<String, String> header(Map<String, String> baseHeader, 
RESTAuthParameter restAuthParameter);
 
     boolean refresh();
 
-    default boolean supportRefresh() {
-        return false;
-    }
-
     default boolean keepRefreshed() {
         return false;
     }
@@ -51,31 +40,7 @@ public interface AuthProvider {
         return Optional.empty();
     }
 
-    default Optional<Long> expiresInMills() {
+    default Optional<Long> tokenRefreshInMills() {
         return Optional.empty();
     }
-
-    static AuthProvider create(Options options) {
-        if 
(options.getOptional(RESTCatalogOptions.TOKEN_PROVIDER_PATH).isPresent()) {
-            if (!options.getOptional(TOKEN_PROVIDER_PATH).isPresent()) {
-                throw new IllegalArgumentException(TOKEN_PROVIDER_PATH.key() + 
" is required");
-            }
-            String tokenFilePath = options.get(TOKEN_PROVIDER_PATH);
-            if (options.getOptional(TOKEN_EXPIRATION_TIME).isPresent()) {
-                long tokenExpireInMills = 
options.get(TOKEN_EXPIRATION_TIME).toMillis();
-                return new BearTokenFileAuthProvider(tokenFilePath, 
tokenExpireInMills);
-
-            } else {
-                return new BearTokenFileAuthProvider(tokenFilePath);
-            }
-        } else {
-            if (options.getOptional(RESTCatalogOptions.TOKEN)
-                    .map(StringUtils::isNullOrWhitespaceOnly)
-                    .orElse(true)) {
-                throw new IllegalArgumentException(
-                        RESTCatalogOptions.TOKEN.key() + " is required and not 
empty");
-            }
-            return new 
BearTokenAuthProvider(options.get(RESTCatalogOptions.TOKEN));
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
 b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderEnum.java
similarity index 55%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderEnum.java
index 73e5081a5e..c84a38a485 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderEnum.java
@@ -18,33 +18,27 @@
 
 package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+/** AuthProviderEnum. */
+public enum AuthProviderEnum {
+    BEAR("bear"),
+    DLF("dlf");
 
-import java.util.Map;
+    private final String identifier;
 
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
-    private static final String AUTHORIZATION_HEADER = "Authorization";
-    private static final String BEARER_PREFIX = "Bearer ";
-
-    protected String token;
-
-    public BearTokenAuthProvider(String token) {
-        this.token = token;
-    }
-
-    public String token() {
-        return token;
+    AuthProviderEnum(String identifier) {
+        this.identifier = identifier;
     }
 
-    @Override
-    public Map<String, String> authHeader() {
-        return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+    public String identifier() {
+        return identifier;
     }
 
-    @Override
-    public boolean refresh() {
-        return true;
+    public static AuthProviderEnum fromString(String identifier) {
+        for (AuthProviderEnum authProviderEnum : AuthProviderEnum.values()) {
+            if (authProviderEnum.identifier.equals(identifier)) {
+                return authProviderEnum;
+            }
+        }
+        throw new IllegalArgumentException("Unknown AuthProvider type: " + 
identifier);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
similarity index 54%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
index 73e5081a5e..1cccfeb626 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
@@ -18,33 +18,21 @@
 
 package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
-    private static final String AUTHORIZATION_HEADER = "Authorization";
-    private static final String BEARER_PREFIX = "Bearer ";
-
-    protected String token;
-
-    public BearTokenAuthProvider(String token) {
-        this.token = token;
-    }
-
-    public String token() {
-        return token;
-    }
-
-    @Override
-    public Map<String, String> authHeader() {
-        return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
-    }
-
-    @Override
-    public boolean refresh() {
-        return true;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.options.Options;
+
+/** Factory for {@link AuthProvider}. */
+public interface AuthProviderFactory extends Factory {
+
+    AuthProvider create(Options options);
+
+    static AuthProvider createAuthProvider(String name, Options options) {
+        AuthProviderFactory factory =
+                FactoryUtil.discoverFactory(
+                        AuthProviderFactory.class.getClassLoader(),
+                        AuthProviderFactory.class,
+                        name);
+        return factory.create(options);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
index 5f77002c0d..1ce391e00d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
@@ -20,19 +20,16 @@ package org.apache.paimon.rest.auth;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTUtil;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.utils.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.paimon.rest.RESTCatalog.HEADER_PREFIX;
-import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
-
 /** Authentication session. */
 public class AuthSession {
 
@@ -43,18 +40,14 @@ public class AuthSession {
     private static final Logger LOG = 
LoggerFactory.getLogger(AuthSession.class);
 
     private final AuthProvider authProvider;
-    private volatile Map<String, String> headers;
 
-    public AuthSession(Map<String, String> headers, AuthProvider authProvider) 
{
+    public AuthSession(AuthProvider authProvider) {
         this.authProvider = authProvider;
-        this.headers = RESTUtil.merge(headers, this.authProvider.authHeader());
     }
 
     public static AuthSession fromRefreshAuthProvider(
-            ScheduledExecutorService executor,
-            Map<String, String> headers,
-            AuthProvider authProvider) {
-        AuthSession session = new AuthSession(headers, authProvider);
+            ScheduledExecutorService executor, AuthProvider authProvider) {
+        AuthSession session = new AuthSession(authProvider);
 
         long startTimeMillis = System.currentTimeMillis();
         Optional<Long> expiresAtMillisOpt = authProvider.expiresAtMillis();
@@ -75,23 +68,17 @@ public class AuthSession {
         return session;
     }
 
-    public Map<String, String> getHeaders() {
+    public AuthProvider getAuthProvider() {
         if (this.authProvider.keepRefreshed() && 
this.authProvider.willSoonExpire()) {
             refresh();
         }
-        return headers;
+        return this.authProvider;
     }
 
     public Boolean refresh() {
-        if (this.authProvider.supportRefresh()
-                && this.authProvider.keepRefreshed()
-                && this.authProvider.expiresInMills().isPresent()) {
-            boolean isSuccessful = this.authProvider.refresh();
-            if (isSuccessful) {
-                Map<String, String> currentHeaders = this.headers;
-                this.headers = RESTUtil.merge(currentHeaders, 
this.authProvider.authHeader());
-            }
-            return isSuccessful;
+        if (this.authProvider.keepRefreshed()
+                && this.authProvider.tokenRefreshInMills().isPresent()) {
+            return this.authProvider.refresh();
         }
 
         return false;
@@ -142,7 +129,7 @@ public class AuthSession {
             scheduleTokenRefresh(
                     executor,
                     session,
-                    refreshStartTime + 
session.authProvider.expiresInMills().get(),
+                    refreshStartTime + 
session.authProvider.tokenRefreshInMills().get(),
                     0);
         } else {
             scheduleTokenRefresh(executor, session, expiresAtMillis, 
retryTimes + 1);
@@ -151,12 +138,15 @@ public class AuthSession {
 
     public static AuthSession createAuthSession(
             Options options, ScheduledExecutorService refreshExecutor) {
-        Map<String, String> baseHeader = extractPrefixMap(options, 
HEADER_PREFIX);
-        AuthProvider authProvider = AuthProvider.create(options);
+        String tokenProvider = options.get(RESTCatalogOptions.TOKEN_PROVIDER);
+        if (StringUtils.isEmpty(tokenProvider)) {
+            throw new IllegalArgumentException("token.provider is not set.");
+        }
+        AuthProvider authProvider = 
AuthProviderFactory.createAuthProvider(tokenProvider, options);
         if (authProvider.keepRefreshed()) {
-            return AuthSession.fromRefreshAuthProvider(refreshExecutor, 
baseHeader, authProvider);
+            return AuthSession.fromRefreshAuthProvider(refreshExecutor, 
authProvider);
         } else {
-            return new AuthSession(baseHeader, authProvider);
+            return new AuthSession(authProvider);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
index 73e5081a5e..a501b98556 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
@@ -18,14 +18,14 @@
 
 package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-
+import java.util.HashMap;
 import java.util.Map;
 
 /** Auth provider for bear token. */
 public class BearTokenAuthProvider implements AuthProvider {
 
-    private static final String AUTHORIZATION_HEADER = "Authorization";
+    public static final String AUTHORIZATION_HEADER_KEY = "Authorization";
+
     private static final String BEARER_PREFIX = "Bearer ";
 
     protected String token;
@@ -34,13 +34,12 @@ public class BearTokenAuthProvider implements AuthProvider {
         this.token = token;
     }
 
-    public String token() {
-        return token;
-    }
-
     @Override
-    public Map<String, String> authHeader() {
-        return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+    public Map<String, String> header(
+            Map<String, String> baseHeader, RESTAuthParameter 
restAuthParameter) {
+        Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
+        headersWithAuth.put(AUTHORIZATION_HEADER_KEY, BEARER_PREFIX + token);
+        return headersWithAuth;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProviderFactory.java
similarity index 56%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProviderFactory.java
index 73e5081a5e..a32c526bb0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProviderFactory.java
@@ -18,33 +18,19 @@
 
 package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
 
-import java.util.Map;
-
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
-    private static final String AUTHORIZATION_HEADER = "Authorization";
-    private static final String BEARER_PREFIX = "Bearer ";
-
-    protected String token;
-
-    public BearTokenAuthProvider(String token) {
-        this.token = token;
-    }
-
-    public String token() {
-        return token;
-    }
+/** Factory for {@link BearTokenAuthProvider}. */
+public class BearTokenAuthProviderFactory implements AuthProviderFactory {
 
     @Override
-    public Map<String, String> authHeader() {
-        return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+    public String identifier() {
+        return AuthProviderEnum.BEAR.identifier();
     }
 
     @Override
-    public boolean refresh() {
-        return true;
+    public AuthProvider create(Options options) {
+        return new 
BearTokenAuthProvider(options.get(RESTCatalogOptions.TOKEN));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileAuthProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileAuthProvider.java
deleted file mode 100644
index a1ecb0b26c..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileAuthProvider.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest.auth;
-
-import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.StringUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Optional;
-
-/** Auth provider for get bear token from file. */
-public class BearTokenFileAuthProvider extends BearTokenAuthProvider {
-
-    public static final double EXPIRED_FACTOR = 0.4;
-
-    private final String tokenFilePath;
-
-    private boolean keepRefreshed = false;
-    private Long expiresAtMillis = null;
-    private Long expiresInMills = null;
-
-    public BearTokenFileAuthProvider(String tokenFilePath) {
-        super(readToken(tokenFilePath));
-        this.tokenFilePath = tokenFilePath;
-    }
-
-    public BearTokenFileAuthProvider(String tokenFilePath, Long 
expiresInMills) {
-        this(tokenFilePath);
-        this.keepRefreshed = true;
-        this.expiresAtMillis = -1L;
-        this.expiresInMills = expiresInMills;
-    }
-
-    @Override
-    public boolean refresh() {
-        long start = System.currentTimeMillis();
-        String newToken = readToken(tokenFilePath);
-        if (StringUtils.isNullOrWhitespaceOnly(newToken)) {
-            return false;
-        }
-        this.expiresAtMillis = start + this.expiresInMills;
-        this.token = newToken;
-        return true;
-    }
-
-    @Override
-    public boolean supportRefresh() {
-        return true;
-    }
-
-    @Override
-    public boolean keepRefreshed() {
-        return this.keepRefreshed;
-    }
-
-    @Override
-    public boolean willSoonExpire() {
-        if (keepRefreshed()) {
-            return expiresAtMillis().get() - System.currentTimeMillis()
-                    < expiresInMills().get() * EXPIRED_FACTOR;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public Optional<Long> expiresAtMillis() {
-        return Optional.ofNullable(this.expiresAtMillis);
-    }
-
-    @Override
-    public Optional<Long> expiresInMills() {
-        return Optional.ofNullable(this.expiresInMills);
-    }
-
-    private static String readToken(String filePath) {
-        try {
-            return FileIOUtils.readFileUtf8(new File(filePath));
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
new file mode 100644
index 0000000000..7b70cb3545
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.utils.FileIOUtils;
+
+import okhttp3.MediaType;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
+
+/** Auth provider for <b>Ali CLoud</b> DLF. */
+public class DLFAuthProvider implements AuthProvider {
+
+    public static final String DLF_HOST_HEADER_KEY = "Host";
+    public static final String DLF_AUTHORIZATION_HEADER_KEY = "Authorization";
+    public static final String DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5";
+    public static final String DLF_CONTENT_TYPE_KEY = "Content-Type";
+    public static final String DLF_DATE_HEADER_KEY = "x-dlf-date";
+    public static final String DLF_SECURITY_TOKEN_HEADER_KEY = 
"x-dlf-security-token";
+    public static final String DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version";
+    public static final String DLF_CONTENT_SHA56_HEADER_KEY = 
"x-dlf-content-sha256";
+    public static final String DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD";
+    public static final double EXPIRED_FACTOR = 0.4;
+    public static final DateTimeFormatter TOKEN_DATE_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+    public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'");
+    public static final DateTimeFormatter AUTH_DATE_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyyMMdd");
+    protected static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json");
+    private static final long[] READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS = 
{1_000, 3_000, 5_000};
+
+    private final String tokenFilePath;
+
+    protected DLFToken token;
+    private final boolean keepRefreshed;
+    private Long expiresAtMillis;
+    private final Long tokenRefreshInMills;
+    private final String region;
+
+    public static DLFAuthProvider buildRefreshToken(
+            String tokenFilePath, Long tokenRefreshInMills, String region) {
+        DLFToken token = readToken(tokenFilePath, 0);
+        Long expiresAtMillis = getExpirationInMills(token.getExpiration());
+        return new DLFAuthProvider(
+                tokenFilePath, token, true, expiresAtMillis, 
tokenRefreshInMills, region);
+    }
+
+    public static DLFAuthProvider buildAKToken(
+            String accessKeyId, String accessKeySecret, String securityToken, 
String region) {
+        DLFToken token = new DLFToken(accessKeyId, accessKeySecret, 
securityToken, null);
+        return new DLFAuthProvider(null, token, false, null, null, region);
+    }
+
+    public DLFAuthProvider(
+            String tokenFilePath,
+            DLFToken token,
+            boolean keepRefreshed,
+            Long expiresAtMillis,
+            Long tokenRefreshInMills,
+            String region) {
+        this.tokenFilePath = tokenFilePath;
+        this.token = token;
+        this.keepRefreshed = keepRefreshed;
+        this.expiresAtMillis = expiresAtMillis;
+        this.tokenRefreshInMills = tokenRefreshInMills;
+        this.region = region;
+    }
+
+    @Override
+    public Map<String, String> header(
+            Map<String, String> baseHeader, RESTAuthParameter 
restAuthParameter) {
+        try {
+            ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+            String date = now.format(AUTH_DATE_FORMATTER);
+            String dateTime = now.format(AUTH_DATE_TIME_FORMATTER);
+            Map<String, String> signHeaders =
+                    generateSignHeaders(
+                            restAuthParameter.host(),
+                            restAuthParameter.data(),
+                            dateTime,
+                            token.getSecurityToken());
+            String authorization =
+                    DLFAuthSignature.getAuthorization(
+                            restAuthParameter, token, region, signHeaders, 
dateTime, date);
+            Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
+            headersWithAuth.putAll(signHeaders);
+            headersWithAuth.put(DLF_AUTHORIZATION_HEADER_KEY, authorization);
+            return headersWithAuth;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Map<String, String> generateSignHeaders(
+            String host, String data, String dateTime, String securityToken) 
throws Exception {
+        Map<String, String> signHeaders = new HashMap<>();
+        signHeaders.put(DLF_DATE_HEADER_KEY, dateTime);
+        signHeaders.put(DLF_HOST_HEADER_KEY, host);
+        signHeaders.put(DLF_CONTENT_SHA56_HEADER_KEY, DLF_CONTENT_SHA56_VALUE);
+        signHeaders.put(DLF_AUTH_VERSION_HEADER_KEY, DLFAuthSignature.VERSION);
+        if (data != null && !data.isEmpty()) {
+            signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE.toString());
+            signHeaders.put(DLF_CONTENT_MD5_HEADER_KEY, 
DLFAuthSignature.md5(data));
+        }
+        if (securityToken != null) {
+            signHeaders.put(DLF_SECURITY_TOKEN_HEADER_KEY, securityToken);
+        }
+        return signHeaders;
+    }
+
+    @Override
+    public boolean refresh() {
+        long start = System.currentTimeMillis();
+        DLFToken newToken = readToken(tokenFilePath, 0);
+        if (newToken == null) {
+            return false;
+        }
+        this.expiresAtMillis = start + this.tokenRefreshInMills;
+        this.token = newToken;
+        return true;
+    }
+
+    @Override
+    public boolean keepRefreshed() {
+        return this.keepRefreshed;
+    }
+
+    @Override
+    public boolean willSoonExpire() {
+        if (keepRefreshed()) {
+            return expiresAtMillis().get() - System.currentTimeMillis()
+                    < tokenRefreshInMills().get() * EXPIRED_FACTOR;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public Optional<Long> expiresAtMillis() {
+        return Optional.ofNullable(this.expiresAtMillis);
+    }
+
+    @Override
+    public Optional<Long> tokenRefreshInMills() {
+        return Optional.ofNullable(this.tokenRefreshInMills);
+    }
+
+    protected static DLFToken readToken(String tokenFilePath, int retryTimes) {
+        try {
+            File tokenFile = new File(tokenFilePath);
+            if (tokenFile.exists()) {
+                String tokenStr = FileIOUtils.readFileUtf8(tokenFile);
+                return OBJECT_MAPPER.readValue(tokenStr, DLFToken.class);
+            } else if (retryTimes < 
READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS.length - 1) {
+                
Thread.sleep(READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS[retryTimes]);
+                return readToken(tokenFilePath, retryTimes + 1);
+            } else {
+                throw new FileNotFoundException(tokenFilePath);
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Long getExpirationInMills(String dateStr) {
+        try {
+            if (dateStr == null) {
+                return null;
+            }
+            LocalDateTime dateTime = LocalDateTime.parse(dateStr, 
TOKEN_DATE_FORMATTER);
+            return dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
new file mode 100644
index 0000000000..a78c44dea2
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
+import static org.apache.paimon.rest.RESTCatalogOptions.URI;
+
+/** Factory for {@link DLFAuthProvider}. */
+public class DLFAuthProviderFactory implements AuthProviderFactory {
+
+    @Override
+    public String identifier() {
+        return AuthProviderEnum.DLF.identifier();
+    }
+
+    @Override
+    public AuthProvider create(Options options) {
+        String region = getRegion(options);
+        if 
(options.getOptional(RESTCatalogOptions.DLF_TOKEN_PATH).isPresent()) {
+            String tokenFilePath = options.get(DLF_TOKEN_PATH);
+            long tokenRefreshInMills = 
options.get(TOKEN_REFRESH_TIME).toMillis();
+            return DLFAuthProvider.buildRefreshToken(tokenFilePath, 
tokenRefreshInMills, region);
+        } else if 
(options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_ID).isPresent()
+                && 
options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET).isPresent()) {
+            return DLFAuthProvider.buildAKToken(
+                    options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
+                    options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
+                    options.get(RESTCatalogOptions.DLF_SECURITY_TOKEN),
+                    region);
+        }
+        throw new IllegalArgumentException("DLF token path or AK must be set 
for DLF Auth.");
+    }
+
+    private static String getRegion(Options options) {
+        String region = "undefined";
+        try {
+            String[] paths = options.get(URI).split("\\.");
+            if (paths.length > 1) {
+                region = paths[1];
+            }
+        } catch (Exception ignore) {
+        }
+        return region;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthSignature.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthSignature.java
new file mode 100644
index 0000000000..b138d38c6c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthSignature.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY;
+import static 
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY;
+import static 
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_CONTENT_TYPE_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_HOST_HEADER_KEY;
+import static 
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_SECURITY_TOKEN_HEADER_KEY;
+
+/** generate authorization for <b>Ali CLoud</b> DLF. */
+public class DLFAuthSignature {
+
+    public static final String VERSION = "v1";
+
+    private static final String SIGNATURE_ALGORITHM = "DLF4-HMAC-SHA256";
+    private static final String PRODUCT = "DlfNext";
+    private static final String HMAC_SHA256 = "HmacSHA256";
+    private static final String REQUEST_TYPE = "aliyun_v4_request";
+    private static final String ADDITIONAL_HEADERS_KEY = "AdditionalHeaders";
+    private static final String SIGNATURE_KEY = "Signature";
+    private static final String NEW_LINE = "\n";
+    private static final List<String> SIGNED_HEADERS =
+            Arrays.asList(
+                    DLF_CONTENT_MD5_HEADER_KEY.toLowerCase(),
+                    DLF_CONTENT_TYPE_KEY.toLowerCase(),
+                    DLF_CONTENT_SHA56_HEADER_KEY.toLowerCase(),
+                    DLF_DATE_HEADER_KEY.toLowerCase(),
+                    DLF_AUTH_VERSION_HEADER_KEY.toLowerCase(),
+                    DLF_SECURITY_TOKEN_HEADER_KEY.toLowerCase());
+    // must be ordered by alphabetical
+    private static final List<String> ADDITIONAL_HEADERS =
+            Arrays.asList(DLF_HOST_HEADER_KEY.toLowerCase());
+
+    public static String getAuthorization(
+            RESTAuthParameter restAuthParameter,
+            DLFToken dlfToken,
+            String region,
+            Map<String, String> headers,
+            String dateTime,
+            String date)
+            throws Exception {
+        String canonicalRequest = getCanonicalRequest(restAuthParameter, 
headers);
+        String stringToSign =
+                Joiner.on(NEW_LINE)
+                        .join(
+                                SIGNATURE_ALGORITHM,
+                                dateTime,
+                                String.format("%s/%s/%s/%s", date, region, 
PRODUCT, REQUEST_TYPE),
+                                sha256Hex(canonicalRequest));
+        byte[] dateKey = hmacSha256(("aliyun_v4" + 
dlfToken.getAccessKeySecret()).getBytes(), date);
+        byte[] dateRegionKey = hmacSha256(dateKey, region);
+        byte[] dateRegionServiceKey = hmacSha256(dateRegionKey, PRODUCT);
+        byte[] signingKey = hmacSha256(dateRegionServiceKey, REQUEST_TYPE);
+        byte[] result = hmacSha256(signingKey, stringToSign);
+        String signature = hexEncode(result);
+        String authorization =
+                Joiner.on(",")
+                        .join(
+                                String.format(
+                                        "%s Credential=%s/%s/%s/%s/%s",
+                                        SIGNATURE_ALGORITHM,
+                                        dlfToken.getAccessKeyId(),
+                                        date,
+                                        region,
+                                        PRODUCT,
+                                        REQUEST_TYPE),
+                                String.format(
+                                        "%s=%s",
+                                        ADDITIONAL_HEADERS_KEY,
+                                        
Joiner.on(",").join(ADDITIONAL_HEADERS)),
+                                String.format("%s=%s", SIGNATURE_KEY, 
signature));
+        return authorization;
+    }
+
+    public static String md5(String raw) throws Exception {
+        MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+        messageDigest.update(raw.getBytes(UTF_8.name()));
+        byte[] md5 = messageDigest.digest();
+        return new String(Base64.getEncoder().encodeToString(md5));
+    }
+
+    private static byte[] hmacSha256(byte[] key, String data) {
+        try {
+            SecretKeySpec secretKeySpec = new SecretKeySpec(key, HMAC_SHA256);
+            Mac mac = Mac.getInstance(HMAC_SHA256);
+            mac.init(secretKeySpec);
+            byte[] hmacBytes = mac.doFinal(data.getBytes());
+            return hmacBytes;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to calculate HMAC-SHA256", e);
+        }
+    }
+
+    public static String getCanonicalRequest(
+            RESTAuthParameter restAuthParameter, Map<String, String> headers) 
throws Exception {
+        String canonicalRequest =
+                Joiner.on(NEW_LINE)
+                        .join(restAuthParameter.method(), 
restAuthParameter.resourcePath());
+        // Canonical Query String + "\n" +
+        TreeMap<String, String> orderMap = new TreeMap<>();
+        if (restAuthParameter.parameters() != null) {
+            orderMap.putAll(restAuthParameter.parameters());
+        }
+        String separator = "";
+        StringBuilder canonicalPart = new StringBuilder();
+        for (Map.Entry<String, String> param : orderMap.entrySet()) {
+            
canonicalPart.append(separator).append(StringUtils.trim(param.getKey()));
+            if (param.getValue() != null && !param.getValue().isEmpty()) {
+                
canonicalPart.append("=").append((StringUtils.trim(param.getValue())));
+            }
+            separator = "&";
+        }
+        canonicalRequest = Joiner.on(NEW_LINE).join(canonicalRequest, 
canonicalPart);
+
+        // Canonical Headers + "\n" +
+        TreeMap<String, String> sortedSignedHeadersMap = 
buildSortedSignedHeadersMap(headers);
+        for (Map.Entry<String, String> header : 
sortedSignedHeadersMap.entrySet()) {
+            canonicalRequest =
+                    Joiner.on(NEW_LINE)
+                            .join(
+                                    canonicalRequest,
+                                    String.format("%s:%s", header.getKey(), 
header.getValue()));
+        }
+
+        // Additional Headers + "\n" +
+        String additionalSignedHeaders = 
Joiner.on(";").join(ADDITIONAL_HEADERS);
+        canonicalRequest = Joiner.on(NEW_LINE).join(canonicalRequest, 
additionalSignedHeaders);
+        String contentSha56 =
+                headers.getOrDefault(
+                        DLF_CONTENT_SHA56_HEADER_KEY, 
DLFAuthProvider.DLF_CONTENT_SHA56_VALUE);
+        return Joiner.on(NEW_LINE).join(canonicalRequest, contentSha56);
+    }
+
+    private static TreeMap<String, String> buildSortedSignedHeadersMap(
+            Map<String, String> headers) {
+        TreeMap<String, String> orderMap = new TreeMap<>();
+        if (headers != null) {
+            for (Map.Entry<String, String> header : headers.entrySet()) {
+                String key = header.getKey().toLowerCase();
+                if (SIGNED_HEADERS.contains(key) || 
ADDITIONAL_HEADERS.contains(key)) {
+                    orderMap.put(key, StringUtils.trim(header.getValue()));
+                }
+            }
+        }
+        return orderMap;
+    }
+
+    private static String sha256Hex(String raw) throws Exception {
+        MessageDigest digest = MessageDigest.getInstance("SHA-256");
+        byte[] hash = digest.digest(raw.getBytes(UTF_8.name()));
+        return hexEncode(hash);
+    }
+
+    private static String hexEncode(byte[] raw) {
+        if (raw == null) {
+            return null;
+        } else {
+            StringBuilder sb = new StringBuilder();
+
+            for (byte b : raw) {
+                String hex = Integer.toHexString(b & 255);
+                if (hex.length() < 2) {
+                    sb.append(0);
+                }
+
+                sb.append(hex);
+            }
+
+            return sb.toString();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
new file mode 100644
index 0000000000..c961dfe38b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/** <b>Ali CLoud</b> DLF Token. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DLFToken {
+
+    private static final String ACCESS_KEY_ID_FIELD_NAME = "AccessKeyId";
+    private static final String ACCESS_KEY_SECRET_FIELD_NAME = 
"AccessKeySecret";
+    private static final String SECURITY_TOKEN_FIELD_NAME = "SecurityToken";
+    private static final String EXPIRATION_FIELD_NAME = "Expiration";
+
+    @JsonProperty(ACCESS_KEY_ID_FIELD_NAME)
+    private final String accessKeyId;
+
+    @JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME)
+    private final String accessKeySecret;
+
+    @JsonProperty(SECURITY_TOKEN_FIELD_NAME)
+    private final String securityToken;
+
+    @JsonProperty(EXPIRATION_FIELD_NAME)
+    private final String expiration;
+
+    @JsonCreator
+    public DLFToken(
+            @JsonProperty(ACCESS_KEY_ID_FIELD_NAME) String accessKeyId,
+            @JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME) String accessKeySecret,
+            @JsonProperty(SECURITY_TOKEN_FIELD_NAME) String securityToken,
+            @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) {
+        this.accessKeyId = accessKeyId;
+        this.accessKeySecret = accessKeySecret;
+        this.securityToken = securityToken;
+        this.expiration = expiration;
+    }
+
+    public String getAccessKeyId() {
+        return accessKeyId;
+    }
+
+    public String getAccessKeySecret() {
+        return accessKeySecret;
+    }
+
+    public String getSecurityToken() {
+        return securityToken;
+    }
+
+    public String getExpiration() {
+        return expiration;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DLFToken that = (DLFToken) o;
+        return Objects.equals(accessKeyId, that.accessKeyId)
+                && Objects.equals(accessKeySecret, that.accessKeySecret)
+                && Objects.equals(securityToken, that.securityToken)
+                && Objects.equals(expiration, that.expiration);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(accessKeyId, accessKeySecret, securityToken, 
expiration);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
 b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
similarity index 56%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
index 73e5081a5e..b76701845f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
@@ -18,33 +18,25 @@
 
 package org.apache.paimon.rest.auth;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-
 import java.util.Map;
+import java.util.function.Function;
 
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
-    private static final String AUTHORIZATION_HEADER = "Authorization";
-    private static final String BEARER_PREFIX = "Bearer ";
+/** The function used to generate auth header for the rest request. */
+public class RESTAuthFunction implements Function<RESTAuthParameter, 
Map<String, String>> {
 
-    protected String token;
-
-    public BearTokenAuthProvider(String token) {
-        this.token = token;
-    }
+    private final Map<String, String> initHeader;
+    private final AuthSession authSession;
 
-    public String token() {
-        return token;
-    }
-
-    @Override
-    public Map<String, String> authHeader() {
-        return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+    public RESTAuthFunction(Map<String, String> initHeader, AuthSession 
authSession) {
+        this.initHeader = initHeader;
+        this.authSession = authSession;
     }
 
     @Override
-    public boolean refresh() {
-        return true;
+    public Map<String, String> apply(RESTAuthParameter restAuthParameter) {
+        if (authSession != null) {
+            return authSession.getAuthProvider().header(initHeader, 
restAuthParameter);
+        }
+        return initHeader;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthParameter.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthParameter.java
new file mode 100644
index 0000000000..f0bbf50495
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthParameter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import java.util.Map;
+
+/** RestAuthParameter for building rest auth header. */
+public class RESTAuthParameter {
+
+    private final String host;
+    private final String resourcePath;
+    private final Map<String, String> parameters;
+    private final String method;
+    private final String data;
+
+    public RESTAuthParameter(
+            String host,
+            String resourcePath,
+            Map<String, String> parameters,
+            String method,
+            String data) {
+        this.host = host;
+        this.resourcePath = resourcePath;
+        this.parameters = parameters;
+        this.method = method;
+        this.data = data;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public String resourcePath() {
+        return resourcePath;
+    }
+
+    public Map<String, String> parameters() {
+        return parameters;
+    }
+
+    public String method() {
+        return method;
+    }
+
+    public String data() {
+        return data;
+    }
+}
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 7f221e5172..81fc475d76 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -38,3 +38,5 @@ 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory
 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory
 org.apache.paimon.rest.RESTCatalogFactory
 org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
+org.apache.paimon.rest.auth.BearTokenAuthProviderFactory
+org.apache.paimon.rest.auth.DLFAuthProviderFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index deaf6da818..8600637c88 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -19,10 +19,15 @@
 package org.apache.paimon.rest;
 
 import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthSession;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
 import org.apache.paimon.rest.exceptions.BadRequestException;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.ErrorResponseResourceType;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.junit.After;
 import org.junit.Before;
@@ -30,6 +35,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -49,6 +56,7 @@ public class HttpClientTest {
     private String mockResponseDataStr;
     private String errorResponseStr;
     private Map<String, String> headers;
+    private RESTAuthFunction restAuthFunction;
 
     @Before
     public void setUp() throws Exception {
@@ -65,7 +73,9 @@ public class HttpClientTest {
         httpClient = new HttpClient(httpClientOptions);
         httpClient.setErrorHandler(errorHandler);
         AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
-        headers = authProvider.authHeader();
+        AuthSession authSession = new AuthSession(authProvider);
+        headers = new HashMap<>();
+        restAuthFunction = new RESTAuthFunction(headers, authSession);
     }
 
     @After
@@ -76,7 +86,7 @@ public class HttpClientTest {
     @Test
     public void testGetSuccess() {
         server.enqueueResponse(mockResponseDataStr, 200);
-        MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class, 
headers);
+        MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class, 
restAuthFunction);
         assertEquals(mockResponseData.data(), response.data());
     }
 
@@ -85,14 +95,14 @@ public class HttpClientTest {
         server.enqueueResponse(errorResponseStr, 400);
         assertThrows(
                 BadRequestException.class,
-                () -> httpClient.get(MOCK_PATH, MockRESTData.class, headers));
+                () -> httpClient.get(MOCK_PATH, MockRESTData.class, 
restAuthFunction));
     }
 
     @Test
     public void testPostSuccess() {
         server.enqueueResponse(mockResponseDataStr, 200);
         MockRESTData response =
-                httpClient.post(MOCK_PATH, mockResponseData, 
MockRESTData.class, headers);
+                httpClient.post(MOCK_PATH, mockResponseData, 
MockRESTData.class, restAuthFunction);
         assertEquals(mockResponseData.data(), response.data());
     }
 
@@ -101,19 +111,25 @@ public class HttpClientTest {
         server.enqueueResponse(errorResponseStr, 400);
         assertThrows(
                 BadRequestException.class,
-                () -> httpClient.post(MOCK_PATH, mockResponseData, 
ErrorResponse.class, headers));
+                () ->
+                        httpClient.post(
+                                MOCK_PATH,
+                                mockResponseData,
+                                ErrorResponse.class,
+                                restAuthFunction));
     }
 
     @Test
     public void testDeleteSuccess() {
         server.enqueueResponse(mockResponseDataStr, 200);
-        assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH, headers));
+        assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH, 
restAuthFunction));
     }
 
     @Test
     public void testDeleteFail() {
         server.enqueueResponse(errorResponseStr, 400);
-        assertThrows(BadRequestException.class, () -> 
httpClient.delete(MOCK_PATH, headers));
+        assertThrows(
+                BadRequestException.class, () -> httpClient.delete(MOCK_PATH, 
restAuthFunction));
     }
 
     @Test
@@ -124,6 +140,22 @@ public class HttpClientTest {
                                 server.getBaseUrl(), Duration.ofSeconds(30), 
1, 10, 2));
         server.enqueueResponse(mockResponseDataStr, 429);
         server.enqueueResponse(mockResponseDataStr, 200);
-        assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, 
headers));
+        assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, 
restAuthFunction));
+    }
+
+    @Test
+    public void testParsePath() {
+        assertEquals(
+                Pair.of("/api/v1/tables", Collections.emptyMap()),
+                HttpClient.parsePath("/api/v1/tables"));
+        assertEquals(
+                Pair.of("/api/v1/tables/my_table$schemas", 
Collections.emptyMap()),
+                HttpClient.parsePath("/api/v1/tables/my_table$schemas"));
+        assertEquals(
+                Pair.of("/api/v1/tables", ImmutableMap.of("pageSize", "10", 
"pageNum", "1")),
+                HttpClient.parsePath("/api/v1/tables?pageSize=10&pageNum=1"));
+        assertEquals(
+                Pair.of("/api/v1/tables", ImmutableMap.of("tableName", 
"t1,t2")),
+                HttpClient.parsePath("/api/v1/tables?tableName=t1,t2"));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index f29bc3eda2..e9b5b7f0a0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -28,6 +28,7 @@ import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
 import org.apache.paimon.rest.requests.AlterPartitionsRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
@@ -112,7 +113,8 @@ public class RESTCatalogServer {
         return new Dispatcher() {
             @Override
             public MockResponse dispatch(RecordedRequest request) {
-                String token = request.getHeaders().get("Authorization");
+                String token =
+                        
request.getHeaders().get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY);
                 RESTResponse response;
                 try {
                     if (!("Bearer " + authToken).equals(token)) {
@@ -591,10 +593,12 @@ public class RESTCatalogServer {
 
     private static String getConfigBody(String warehouseStr) {
         return String.format(
-                "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
+                "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\", \"%s\": 
\"%s\"}}",
                 RESTCatalogInternalOptions.PREFIX.key(),
                 PREFIX,
                 CatalogOptions.WAREHOUSE.key(),
-                warehouseStr);
+                warehouseStr,
+                "header.test-header",
+                "test-value");
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 1fb4dcbcd7..64389378bf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -24,6 +24,9 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.NotAuthorizedException;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
@@ -39,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -61,6 +65,7 @@ class RESTCatalogTest extends CatalogTestBase {
         Options options = new Options();
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
         this.catalog = new RESTCatalog(CatalogContext.create(options));
     }
@@ -74,6 +79,7 @@ class RESTCatalogTest extends CatalogTestBase {
     void testInitFailWhenDefineWarehouse() {
         Options options = new Options();
         options.set(CatalogOptions.WAREHOUSE, warehouse);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         assertThatThrownBy(() -> new 
RESTCatalog(CatalogContext.create(options)))
                 .isInstanceOf(IllegalArgumentException.class);
     }
@@ -83,12 +89,27 @@ class RESTCatalogTest extends CatalogTestBase {
         Options options = new Options();
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, "aaaaa");
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
         options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER);
         assertThatThrownBy(() -> new 
RESTCatalog(CatalogContext.create(options)))
                 .isInstanceOf(NotAuthorizedException.class);
     }
 
+    @Test
+    void testHeader() {
+        RESTCatalog restCatalog = (RESTCatalog) catalog;
+        Map<String, String> parameters = new HashMap<>();
+        parameters.put("k1", "v1");
+        parameters.put("k2", "v2");
+        RESTAuthParameter restAuthParameter =
+                new RESTAuthParameter("host", "/path", parameters, "method", 
"data");
+        Map<String, String> headers = restCatalog.headers(restAuthParameter);
+        assertEquals(
+                headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY), 
"Bearer init_token");
+        assertEquals(headers.get("test-header"), "test-value");
+    }
+
     @Test
     void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception {
         Identifier identifier = Identifier.create("test_db", "test_table");
@@ -115,6 +136,7 @@ class RESTCatalogTest extends CatalogTestBase {
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
         options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         this.catalog = new RESTCatalog(CatalogContext.create(options));
         List<Identifier> identifiers =
                 Lists.newArrayList(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
deleted file mode 100644
index 5fdb1a9c40..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest.auth;
-
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTCatalogOptions;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-
-/** Test for {@link AuthProvider}. */
-public class AuthProviderTest {
-
-    @Rule public TemporaryFolder folder = new TemporaryFolder();
-
-    @Test
-    public void testCreateBearTokenSuccess() {
-        Options options = new Options();
-        String token = UUID.randomUUID().toString();
-        options.set(RESTCatalogOptions.TOKEN, token);
-        BearTokenAuthProvider authProvider = (BearTokenAuthProvider) 
AuthProvider.create(options);
-        assertEquals(token, authProvider.token());
-    }
-
-    @Test
-    public void testCreateBearTokenFail() {
-        Options options = new Options();
-        assertThrows(IllegalArgumentException.class, () -> 
AuthProvider.create(options));
-    }
-
-    @Test
-    public void testCreateBearTokenFileSuccess() throws Exception {
-        Options options = new Options();
-        String fileName = "token";
-        File tokenFile = folder.newFile(fileName);
-        String token = UUID.randomUUID().toString();
-        FileUtils.writeStringToFile(tokenFile, token);
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, 
tokenFile.getPath());
-        BearTokenFileAuthProvider authProvider =
-                (BearTokenFileAuthProvider) AuthProvider.create(options);
-        assertEquals(token, authProvider.token());
-    }
-
-    @Test
-    public void testCreateRefreshBearTokenFileSuccess() throws Exception {
-        Options options = new Options();
-        String fileName = "token";
-        File tokenFile = folder.newFile(fileName);
-        String token = UUID.randomUUID().toString();
-        FileUtils.writeStringToFile(tokenFile, token);
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, 
tokenFile.getPath());
-        options.set(RESTCatalogOptions.TOKEN_EXPIRATION_TIME, 
Duration.ofSeconds(10L));
-        BearTokenFileAuthProvider authProvider =
-                (BearTokenFileAuthProvider) AuthProvider.create(options);
-        assertEquals(token, authProvider.token());
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
index b54c3103ed..e20ff5cb1d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
@@ -18,9 +18,13 @@
 
 package org.apache.paimon.rest.auth;
 
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ThreadPoolUtils;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.commons.io.FileUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -29,16 +33,29 @@ import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_SECURITY_TOKEN;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
 import static 
org.apache.paimon.rest.auth.AuthSession.MAX_REFRESH_WINDOW_MILLIS;
 import static org.apache.paimon.rest.auth.AuthSession.MIN_REFRESH_WAIT_MILLIS;
 import static org.apache.paimon.rest.auth.AuthSession.REFRESH_NUM_RETRIES;
+import static 
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTHORIZATION_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -46,6 +63,7 @@ import static org.mockito.Mockito.when;
 public class AuthSessionTest {
 
     @Rule public TemporaryFolder folder = new TemporaryFolder();
+    private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new 
ObjectMapper();
 
     @Test
     public void testBearToken() {
@@ -53,78 +71,72 @@ public class AuthSessionTest {
         Map<String, String> initialHeaders = new HashMap<>();
         initialHeaders.put("k1", "v1");
         initialHeaders.put("k2", "v2");
-        AuthProvider authProvider = new BearTokenAuthProvider(token);
-        AuthSession session = new AuthSession(initialHeaders, authProvider);
-        Map<String, String> header = session.getHeaders();
-        assertEquals(header.get("Authorization"), "Bearer " + token);
-        assertEquals(header.get("k1"), "v1");
-        for (Map.Entry<String, String> entry : initialHeaders.entrySet()) {
-            assertEquals(entry.getValue(), header.get(entry.getKey()));
-        }
-        assertEquals(header.size(), initialHeaders.size() + 1);
+        Options options = new Options();
+        options.set(TOKEN.key(), token);
+        AuthProvider authProvider =
+                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.BEAR.identifier(), 
options);
+        AuthSession session = new AuthSession(authProvider);
+        Map<String, String> headers = 
session.getAuthProvider().header(initialHeaders, null);
+        assertEquals(
+                headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY), 
"Bearer " + token);
     }
 
     @Test
-    public void testRefreshBearTokenFileAuthProvider() throws IOException, 
InterruptedException {
-        String fileName = "token";
+    public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException, 
InterruptedException {
+        String fileName = UUID.randomUUID().toString();
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String token = tokenFile2Token.getRight();
         File tokenFile = tokenFile2Token.getLeft();
-        Map<String, String> initialHeaders = new HashMap<>();
-        long expiresInMillis = 1000L;
+        long tokenRefreshInMills = 1000;
         AuthProvider authProvider =
-                new BearTokenFileAuthProvider(tokenFile.getPath(), 
expiresInMillis);
+                generateDLFAuthProvider(Optional.of(tokenRefreshInMills), 
fileName, "serverUrl");
         ScheduledExecutorService executor =
                 ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
-        AuthSession session =
-                AuthSession.fromRefreshAuthProvider(executor, initialHeaders, 
authProvider);
-        Map<String, String> header = session.getHeaders();
-        assertEquals(header.get("Authorization"), "Bearer " + token);
+        AuthSession session = AuthSession.fromRefreshAuthProvider(executor, 
authProvider);
+        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(authToken, token);
         tokenFile.delete();
         tokenFile2Token = generateTokenAndWriteToFile(fileName);
         token = tokenFile2Token.getRight();
-        Thread.sleep(expiresInMillis + 500L);
-        header = session.getHeaders();
-        assertEquals(header.get("Authorization"), "Bearer " + token);
+        Thread.sleep(tokenRefreshInMills * 2);
+        authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(authToken, token);
     }
 
     @Test
     public void testRefreshAuthProviderIsSoonExpire() throws IOException, 
InterruptedException {
-        String fileName = "token";
+        String fileName = UUID.randomUUID().toString();
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String token = tokenFile2Token.getRight();
         File tokenFile = tokenFile2Token.getLeft();
-        Map<String, String> initialHeaders = new HashMap<>();
-        long expiresInMillis = 1000L;
+        long tokenRefreshInMills = 5000L;
         AuthProvider authProvider =
-                new BearTokenFileAuthProvider(tokenFile.getPath(), 
expiresInMillis);
-        AuthSession session =
-                AuthSession.fromRefreshAuthProvider(null, initialHeaders, 
authProvider);
-        Map<String, String> header = session.getHeaders();
-        assertEquals(header.get("Authorization"), "Bearer " + token);
+                generateDLFAuthProvider(Optional.of(tokenRefreshInMills), 
fileName, "serverUrl");
+        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
+        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(token, authToken);
+        Thread.sleep((long) (tokenRefreshInMills * (1 - 
DLFAuthProvider.EXPIRED_FACTOR)) + 10L);
         tokenFile.delete();
         tokenFile2Token = generateTokenAndWriteToFile(fileName);
         token = tokenFile2Token.getRight();
         tokenFile = tokenFile2Token.getLeft();
         FileUtils.writeStringToFile(tokenFile, token);
-        Thread.sleep(
-                (long) (expiresInMillis * (1 - 
BearTokenFileAuthProvider.EXPIRED_FACTOR)) + 10L);
-        header = session.getHeaders();
-        assertEquals(header.get("Authorization"), "Bearer " + token);
+        dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider();
+        authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(token, authToken);
     }
 
     @Test
     public void testRetryWhenRefreshFail() throws Exception {
-        Map<String, String> initialHeaders = new HashMap<>();
-        AuthProvider authProvider = 
Mockito.mock(BearTokenFileAuthProvider.class);
+        AuthProvider authProvider = Mockito.mock(DLFAuthProvider.class);
         long expiresAtMillis = System.currentTimeMillis() - 1000L;
         
when(authProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis));
-        when(authProvider.expiresInMills()).thenReturn(Optional.of(50L));
-        when(authProvider.supportRefresh()).thenReturn(true);
+        when(authProvider.tokenRefreshInMills()).thenReturn(Optional.of(50L));
         when(authProvider.keepRefreshed()).thenReturn(true);
         when(authProvider.refresh()).thenReturn(false);
-        AuthSession session =
-                AuthSession.fromRefreshAuthProvider(null, initialHeaders, 
authProvider);
+        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
         AuthSession.scheduleTokenRefresh(
                 ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"),
                 session,
@@ -149,10 +161,128 @@ public class AuthSessionTest {
         assertEquals(timeToWait, MAX_REFRESH_WINDOW_MILLIS);
     }
 
+    @Test
+    public void testCreateDLFAuthProviderByStsToken() throws IOException {
+        Options options = new Options();
+        String akId = UUID.randomUUID().toString();
+        String akSecret = UUID.randomUUID().toString();
+        String securityToken = UUID.randomUUID().toString();
+        DLFToken token = new DLFToken(akId, akSecret, securityToken, null);
+        options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId());
+        options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
+        options.set(DLF_SECURITY_TOKEN.key(), token.getSecurityToken());
+        AuthProvider authProvider =
+                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
+        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
+        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token), 
authToken);
+    }
+
+    @Test
+    public void testCreateDLFAuthProviderByAk() throws IOException {
+        Options options = new Options();
+        String akId = UUID.randomUUID().toString();
+        String akSecret = UUID.randomUUID().toString();
+        DLFToken token = new DLFToken(akId, akSecret, null, null);
+        options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId());
+        options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
+        AuthProvider authProvider =
+                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
+        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
+        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token), 
authToken);
+    }
+
+    @Test
+    public void testCreateDlfAuthProviderByFileNoDefineRefresh() throws 
IOException {
+        String fileName = UUID.randomUUID().toString();
+        Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
+        String token = tokenFile2Token.getRight();
+        AuthProvider authProvider =
+                generateDLFAuthProvider(Optional.empty(), fileName, 
"serverUrl");
+        ScheduledExecutorService executor =
+                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
+        AuthSession session = AuthSession.fromRefreshAuthProvider(executor, 
authProvider);
+        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        assertEquals(authToken, token);
+    }
+
+    @Test
+    public void testCreateDLFAuthProviderWithoutNeedConf() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        AuthProviderFactory.createAuthProvider(
+                                AuthProviderEnum.DLF.identifier(), new 
Options()));
+    }
+
+    @Test
+    public void testDLFAuthProviderAuthHeaderWhenDataIsNotEmpty() throws 
Exception {
+        String fileName = UUID.randomUUID().toString();
+        Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
+        String tokenStr = tokenFile2Token.getRight();
+        String serverUrl = "https://dlf.cn-hangzhou.aliyuncs.com";;
+        AuthProvider authProvider = generateDLFAuthProvider(Optional.empty(), 
fileName, serverUrl);
+        DLFToken token = OBJECT_MAPPER_INSTANCE.readValue(tokenStr, 
DLFToken.class);
+        Map<String, String> parameters = new HashMap<>();
+        parameters.put("k1", "v1");
+        parameters.put("k2", "v2");
+        String data = "data";
+        RESTAuthParameter restAuthParameter =
+                new RESTAuthParameter(serverUrl, "/path", parameters, 
"method", "data");
+        Map<String, String> header = authProvider.header(new HashMap<>(), 
restAuthParameter);
+        String authorization = header.get(DLF_AUTHORIZATION_HEADER_KEY);
+        String[] credentials = authorization.split(",")[0].split(" 
")[1].split("/");
+        String dateTime = header.get(DLF_DATE_HEADER_KEY);
+        String date = credentials[1];
+        String newAuthorization =
+                DLFAuthSignature.getAuthorization(
+                        new RESTAuthParameter(serverUrl, "/path", parameters, 
"method", "data"),
+                        token,
+                        "cn-hangzhou",
+                        header,
+                        dateTime,
+                        date);
+        assertEquals(newAuthorization, authorization);
+        assertEquals(restAuthParameter.host(), 
header.get(DLFAuthProvider.DLF_HOST_HEADER_KEY));
+        assertEquals(
+                token.getSecurityToken(),
+                header.get(DLFAuthProvider.DLF_SECURITY_TOKEN_HEADER_KEY));
+        assertTrue(header.containsKey(DLF_DATE_HEADER_KEY));
+        assertEquals(
+                DLFAuthSignature.VERSION, 
header.get(DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY));
+        assertEquals(
+                DLFAuthProvider.MEDIA_TYPE.toString(),
+                header.get(DLFAuthProvider.DLF_CONTENT_TYPE_KEY));
+        assertEquals(
+                DLFAuthSignature.md5(data), 
header.get(DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY));
+        assertEquals(
+                DLFAuthProvider.DLF_CONTENT_SHA56_VALUE,
+                header.get(DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY));
+    }
+
     private Pair<File, String> generateTokenAndWriteToFile(String fileName) 
throws IOException {
         File tokenFile = folder.newFile(fileName);
-        String token = UUID.randomUUID().toString();
-        FileUtils.writeStringToFile(tokenFile, token);
-        return Pair.of(tokenFile, token);
+        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+        String expiration = now.format(TOKEN_DATE_FORMATTER);
+        String secret = UUID.randomUUID().toString();
+        DLFToken token = new DLFToken("accessKeyId", secret, "securityToken", 
expiration);
+        String tokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(token);
+        FileUtils.writeStringToFile(tokenFile, tokenStr);
+        return Pair.of(tokenFile, tokenStr);
+    }
+
+    private AuthProvider generateDLFAuthProvider(
+            Optional<Long> tokenRefreshInMillsOpt, String fileName, String 
serverUrl) {
+        Options options = new Options();
+        options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" + 
fileName);
+        options.set(RESTCatalogOptions.URI.key(), serverUrl);
+        tokenRefreshInMillsOpt.ifPresent(
+                tokenRefreshInMills ->
+                        options.set(TOKEN_REFRESH_TIME.key(), 
tokenRefreshInMills + "ms"));
+        return 
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthSignatureTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthSignatureTest.java
new file mode 100644
index 0000000000..3a0caf3a49
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthSignatureTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.rest.MockRESTMessage;
+import org.apache.paimon.rest.RESTObjectMapper;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test for {@link DLFAuthSignature}. */
+public class DLFAuthSignatureTest {
+
+    @Test
+    public void testGetAuthorization() throws Exception {
+        String endpoint = "dlf.cn-hangzhou.aliyuncs.com";
+        String region = "cn-hangzhou";
+        String dateTime = "20231203T121212Z";
+        String date = "20231203";
+        Map<String, String> parameters = new HashMap<>();
+        parameters.put("k1", "v1");
+        parameters.put("k2", "v2");
+        String data =
+                RESTObjectMapper.OBJECT_MAPPER.writeValueAsString(
+                        MockRESTMessage.createDatabaseRequest("database"));
+        RESTAuthParameter restAuthParameter =
+                new RESTAuthParameter(endpoint, "/v1/paimon/databases", 
parameters, "POST", data);
+        DLFToken token = new DLFToken("access-key-id", "access-key-secret", 
"securityToken", null);
+        Map<String, String> signHeaders =
+                DLFAuthProvider.generateSignHeaders(
+                        restAuthParameter.host(),
+                        restAuthParameter.data(),
+                        dateTime,
+                        "securityToken");
+        String authorization =
+                DLFAuthSignature.getAuthorization(
+                        restAuthParameter, token, region, signHeaders, 
dateTime, date);
+        Assertions.assertEquals(
+                "DLF4-HMAC-SHA256 
Credential=access-key-id/20231203/cn-hangzhou/DlfNext/aliyun_v4_request,AdditionalHeaders=host,Signature=5afbdad67b52f17c47e202da2222bff9f5cf2f86c3ed973bb919a8216d086fb7",
+                authorization);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index c310de32fd..145fcd0ba3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.rest.RESTCatalogOptions;
 import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
 
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.AfterEach;
@@ -101,6 +102,7 @@ class RESTCatalogITCase extends CatalogITCaseBase {
         options.put(RESTCatalogOptions.URI.key(), serverUrl);
         options.put(RESTCatalogOptions.TOKEN.key(), initToken);
         options.put(RESTCatalogOptions.THREAD_POOL_SIZE.key(), "" + 1);
+        options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.BEAR.identifier());
         return options;
     }
 

Reply via email to