This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e46495cd [rest] Simplify auth provider to synchronously refresh 
token (#5562)
98e46495cd is described below

commit 98e46495cd4849ab3be73916cf8c15ea6e5c5a3c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 30 23:17:49 2025 +0800

    [rest] Simplify auth provider to synchronously refresh token (#5562)
---
 .../java/org/apache/paimon/rest/HttpClient.java    |   3 -
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  36 +--
 .../org/apache/paimon/rest/RESTCatalogOptions.java |   8 -
 .../java/org/apache/paimon/rest/RESTClient.java    |   3 +-
 .../org/apache/paimon/rest/RESTTokenFileIO.java    |   9 +-
 .../org/apache/paimon/rest/auth/AuthProvider.java  |  22 +-
 .../paimon/rest/auth/AuthProviderFactory.java      |  11 +-
 .../org/apache/paimon/rest/auth/AuthSession.java   | 152 -------------
 .../paimon/rest/auth/BearTokenAuthProvider.java    |   7 +-
 .../apache/paimon/rest/auth/DLFAuthProvider.java   | 138 +++++-------
 .../paimon/rest/auth/DLFAuthProviderFactory.java   |   9 +-
 .../apache/paimon/rest/auth/DLFECSTokenLoader.java |   7 +-
 .../paimon/rest/auth/DLFLocalFileTokenLoader.java  |  42 ++--
 .../java/org/apache/paimon/rest/auth/DLFToken.java |  28 ++-
 .../apache/paimon/rest/auth/DLFTokenLoader.java    |   2 +
 .../apache/paimon/rest/auth/RESTAuthFunction.java  |  11 +-
 .../org/apache/paimon/rest/HttpClientTest.java     |   4 +-
 .../apache/paimon/rest/MockRESTCatalogTest.java    |   4 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |   2 +-
 .../org/apache/paimon/rest/RESTCatalogTest.java    |   2 +-
 ...{AuthSessionTest.java => AuthProviderTest.java} | 247 +++++++--------------
 .../paimon/rest/auth/CustomTestDLFTokenLoader.java |   7 +-
 22 files changed, 241 insertions(+), 513 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index dca501b693..12f7f2938a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -241,7 +241,4 @@ public class HttpClient implements RESTClient {
                 new RESTAuthParameter(path, queryParams, method, data);
         return headerFunction.apply(restAuthParameter);
     }
-
-    @Override
-    public void close() {}
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 50c8ebbe65..01e4e75e95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -33,7 +33,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
-import org.apache.paimon.rest.auth.AuthSession;
+import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthFunction;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
@@ -102,7 +102,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -118,8 +117,7 @@ import static 
org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
-import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
-import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
+import static 
org.apache.paimon.rest.auth.AuthProviderFactory.createAuthProvider;
 
 /** A catalog implementation for REST. */
 public class RESTCatalog implements Catalog {
@@ -132,6 +130,7 @@ public class RESTCatalog implements Catalog {
     public static final String TABLE_NAME_PATTERN = "tableNamePattern";
     public static final String VIEW_NAME_PATTERN = "viewNamePattern";
     public static final String PARTITION_NAME_PATTERN = "partitionNamePattern";
+    public static final long TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000L;
 
     private final RESTClient client;
     private final ResourcePaths resourcePaths;
@@ -139,15 +138,13 @@ public class RESTCatalog implements Catalog {
     private final boolean dataTokenEnabled;
     private final RESTAuthFunction restAuthFunction;
 
-    private volatile ScheduledExecutorService refreshExecutor = null;
-
     public RESTCatalog(CatalogContext context) {
         this(context, true);
     }
 
     public RESTCatalog(CatalogContext context, boolean configRequired) {
         this.client = new 
HttpClient(context.options().get(RESTCatalogOptions.URI));
-        AuthSession catalogAuth = createAuthSession(context.options(), 
tokenRefreshExecutor());
+        AuthProvider authProvider = createAuthProvider(context.options());
         Options options = context.options();
         Map<String, String> baseHeaders = Collections.emptyMap();
         if (configRequired) {
@@ -165,11 +162,11 @@ public class RESTCatalog implements Catalog {
                                             queryParams,
                                             ConfigResponse.class,
                                             new RESTAuthFunction(
-                                                    Collections.emptyMap(), 
catalogAuth))
+                                                    Collections.emptyMap(), 
authProvider))
                                     .merge(context.options().toMap()));
             baseHeaders.putAll(extractPrefixMap(options, HEADER_PREFIX));
         }
-        this.restAuthFunction = new RESTAuthFunction(baseHeaders, catalogAuth);
+        this.restAuthFunction = new RESTAuthFunction(baseHeaders, 
authProvider);
         context = CatalogContext.create(options, context.preferIO(), 
context.fallbackIO());
         this.context = context;
         this.resourcePaths = ResourcePaths.forCatalogProperties(options);
@@ -983,14 +980,7 @@ public class RESTCatalog implements Catalog {
     }
 
     @Override
-    public void close() throws Exception {
-        if (refreshExecutor != null) {
-            refreshExecutor.shutdownNow();
-        }
-        if (client != null) {
-            client.close();
-        }
-    }
+    public void close() throws Exception {}
 
     @VisibleForTesting
     Map<String, String> headers(RESTAuthParameter restAuthParameter) {
@@ -1036,18 +1026,6 @@ public class RESTCatalog implements Catalog {
         return results;
     }
 
-    private ScheduledExecutorService tokenRefreshExecutor() {
-        if (refreshExecutor == null) {
-            synchronized (this) {
-                if (refreshExecutor == null) {
-                    this.refreshExecutor = createScheduledThreadPool(1, 
"token-refresh-thread");
-                }
-            }
-        }
-
-        return refreshExecutor;
-    }
-
     private FileIO fileIOForData(Path path, Identifier identifier) {
         return dataTokenEnabled
                 ? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 0359f6fa25..8c5cc586b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -21,8 +21,6 @@ package org.apache.paimon.rest;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 
-import java.time.Duration;
-
 /** Options for REST Catalog. */
 public class RESTCatalogOptions {
 
@@ -38,12 +36,6 @@ public class RESTCatalogOptions {
                     .noDefaultValue()
                     .withDescription("REST Catalog auth bear token.");
 
-    public static final ConfigOption<Duration> TOKEN_REFRESH_TIME =
-            ConfigOptions.key("token.refresh-time")
-                    .durationType()
-                    .defaultValue(Duration.ofHours(1))
-                    .withDescription("REST Catalog auth token refresh time.");
-
     public static final ConfigOption<String> TOKEN_PROVIDER =
             ConfigOptions.key("token.provider")
                     .stringType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
index ac1a27a2be..b2058ec806 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
@@ -20,11 +20,10 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.rest.auth.RESTAuthFunction;
 
-import java.io.Closeable;
 import java.util.Map;
 
 /** Interface for a basic HTTP Client for interfacing with the REST catalog. */
-public interface RESTClient extends Closeable {
+public interface RESTClient {
 
     <T extends RESTResponse> T get(
             String path, Class<T> responseType, RESTAuthFunction 
restAuthFunction);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index bcf7840dc5..13e31e8634 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+import static 
org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
 
 /** A {@link FileIO} to support getting token from REST Server. */
 public class RESTTokenFileIO implements FileIO {
@@ -191,11 +192,13 @@ public class RESTTokenFileIO implements FileIO {
     }
 
     private boolean shouldRefresh() {
-        return token == null || token.expireAtMillis() - 
System.currentTimeMillis() < 3600_000L;
+        return token == null
+                || token.expireAtMillis() - System.currentTimeMillis()
+                        < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
     }
 
     private void refreshToken() {
-        LOG.info("begin refresh token for identifier [{}]", identifier);
+        LOG.info("begin refresh data token for identifier [{}]", identifier);
         GetTableTokenResponse response;
         if (catalogInstance != null) {
             try {
@@ -211,7 +214,7 @@ public class RESTTokenFileIO implements FileIO {
             }
         }
         LOG.info(
-                "end refresh token for identifier [{}] expiresAtMillis [{}]",
+                "end refresh data token for identifier [{}] expiresAtMillis 
[{}]",
                 identifier,
                 response.getExpiresAtMillis());
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
index 666337ec79..d7755ff053 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
@@ -19,28 +19,10 @@
 package org.apache.paimon.rest.auth;
 
 import java.util.Map;
-import java.util.Optional;
 
 /** Authentication provider. */
 public interface AuthProvider {
 
-    Map<String, String> header(Map<String, String> baseHeader, 
RESTAuthParameter restAuthParameter);
-
-    boolean refresh();
-
-    default boolean keepRefreshed() {
-        return false;
-    }
-
-    default boolean willSoonExpire() {
-        return false;
-    }
-
-    default Optional<Long> expiresAtMillis() {
-        return Optional.empty();
-    }
-
-    default Optional<Long> tokenRefreshInMills() {
-        return Optional.empty();
-    }
+    Map<String, String> mergeAuthHeader(
+            Map<String, String> baseHeader, RESTAuthParameter 
restAuthParameter);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
index 1cccfeb626..7562451a34 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
@@ -21,18 +21,25 @@ package org.apache.paimon.rest.auth;
 import org.apache.paimon.factories.Factory;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER;
 
 /** Factory for {@link AuthProvider}. */
 public interface AuthProviderFactory extends Factory {
 
     AuthProvider create(Options options);
 
-    static AuthProvider createAuthProvider(String name, Options options) {
+    static AuthProvider createAuthProvider(Options options) {
+        String tokenProvider = options.get(TOKEN_PROVIDER);
+        if (StringUtils.isEmpty(tokenProvider)) {
+            throw new IllegalArgumentException("token.provider is not set.");
+        }
         AuthProviderFactory factory =
                 FactoryUtil.discoverFactory(
                         AuthProviderFactory.class.getClassLoader(),
                         AuthProviderFactory.class,
-                        name);
+                        tokenProvider);
         return factory.create(options);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
deleted file mode 100644
index 1ce391e00d..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest.auth;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTCatalogOptions;
-import org.apache.paimon.utils.StringUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/** Authentication session. */
-public class AuthSession {
-
-    static final int REFRESH_NUM_RETRIES = 5;
-    static final long MIN_REFRESH_WAIT_MILLIS = 10;
-    static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AuthSession.class);
-
-    private final AuthProvider authProvider;
-
-    public AuthSession(AuthProvider authProvider) {
-        this.authProvider = authProvider;
-    }
-
-    public static AuthSession fromRefreshAuthProvider(
-            ScheduledExecutorService executor, AuthProvider authProvider) {
-        AuthSession session = new AuthSession(authProvider);
-
-        long startTimeMillis = System.currentTimeMillis();
-        Optional<Long> expiresAtMillisOpt = authProvider.expiresAtMillis();
-
-        // when init session if token expire time is in the past, refresh it 
and update
-        // expiresAtMillis
-        if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <= 
startTimeMillis) {
-            boolean refreshSuccessful = session.refresh();
-            if (refreshSuccessful) {
-                expiresAtMillisOpt = session.authProvider.expiresAtMillis();
-            }
-        }
-
-        if (null != executor && expiresAtMillisOpt.isPresent()) {
-            scheduleTokenRefresh(executor, session, expiresAtMillisOpt.get());
-        }
-
-        return session;
-    }
-
-    public AuthProvider getAuthProvider() {
-        if (this.authProvider.keepRefreshed() && 
this.authProvider.willSoonExpire()) {
-            refresh();
-        }
-        return this.authProvider;
-    }
-
-    public Boolean refresh() {
-        if (this.authProvider.keepRefreshed()
-                && this.authProvider.tokenRefreshInMills().isPresent()) {
-            return this.authProvider.refresh();
-        }
-
-        return false;
-    }
-
-    @VisibleForTesting
-    static void scheduleTokenRefresh(
-            ScheduledExecutorService executor, AuthSession session, long 
expiresAtMillis) {
-        scheduleTokenRefresh(executor, session, expiresAtMillis, 0);
-    }
-
-    @VisibleForTesting
-    static long getTimeToWaitByExpiresInMills(long expiresInMillis) {
-        // how much ahead of time to start the refresh to allow it to complete
-        long refreshWindowMillis = Math.min(expiresInMillis, 
MAX_REFRESH_WINDOW_MILLIS);
-        // how much time to wait before expiration
-        long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
-        // how much time to actually wait
-        return Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS);
-    }
-
-    private static void scheduleTokenRefresh(
-            ScheduledExecutorService executor,
-            AuthSession session,
-            long expiresAtMillis,
-            int retryTimes) {
-        if (retryTimes < REFRESH_NUM_RETRIES) {
-            long expiresInMillis = expiresAtMillis - 
System.currentTimeMillis();
-            long timeToWait = getTimeToWaitByExpiresInMills(expiresInMillis);
-
-            executor.schedule(
-                    () -> doRefresh(executor, session, expiresAtMillis, 
retryTimes),
-                    timeToWait,
-                    TimeUnit.MILLISECONDS);
-        } else {
-            LOG.warn("Failed to refresh token after {} retries.", 
REFRESH_NUM_RETRIES);
-        }
-    }
-
-    private static void doRefresh(
-            ScheduledExecutorService executor,
-            AuthSession session,
-            long expiresAtMillis,
-            int retryTimes) {
-        long refreshStartTime = System.currentTimeMillis();
-        boolean isSuccessful = session.refresh();
-        if (isSuccessful) {
-            scheduleTokenRefresh(
-                    executor,
-                    session,
-                    refreshStartTime + 
session.authProvider.tokenRefreshInMills().get(),
-                    0);
-        } else {
-            scheduleTokenRefresh(executor, session, expiresAtMillis, 
retryTimes + 1);
-        }
-    }
-
-    public static AuthSession createAuthSession(
-            Options options, ScheduledExecutorService refreshExecutor) {
-        String tokenProvider = options.get(RESTCatalogOptions.TOKEN_PROVIDER);
-        if (StringUtils.isEmpty(tokenProvider)) {
-            throw new IllegalArgumentException("token.provider is not set.");
-        }
-        AuthProvider authProvider = 
AuthProviderFactory.createAuthProvider(tokenProvider, options);
-        if (authProvider.keepRefreshed()) {
-            return AuthSession.fromRefreshAuthProvider(refreshExecutor, 
authProvider);
-        } else {
-            return new AuthSession(authProvider);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
index a501b98556..39625818a6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
@@ -35,15 +35,10 @@ public class BearTokenAuthProvider implements AuthProvider {
     }
 
     @Override
-    public Map<String, String> header(
+    public Map<String, String> mergeAuthHeader(
             Map<String, String> baseHeader, RESTAuthParameter 
restAuthParameter) {
         Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
         headersWithAuth.put(AUTHORIZATION_HEADER_KEY, BEARER_PREFIX + token);
         return headersWithAuth;
     }
-
-    @Override
-    public boolean refresh() {
-        return true;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
index f82720b33f..7eece82a92 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
@@ -18,19 +18,28 @@
 
 package org.apache.paimon.rest.auth;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+
 import okhttp3.MediaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
-import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
+
+import static 
org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Auth provider for <b>Ali CLoud</b> DLF. */
 public class DLFAuthProvider implements AuthProvider {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DLFAuthProvider.class);
+
     public static final String DLF_AUTHORIZATION_HEADER_KEY = "Authorization";
     public static final String DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5";
     public static final String DLF_CONTENT_TYPE_KEY = "Content-Type";
@@ -39,55 +48,36 @@ public class DLFAuthProvider implements AuthProvider {
     public static final String DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version";
     public static final String DLF_CONTENT_SHA56_HEADER_KEY = 
"x-dlf-content-sha256";
     public static final String DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD";
-    public static final double EXPIRED_FACTOR = 0.4;
-    public static final DateTimeFormatter TOKEN_DATE_FORMATTER =
-            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
     public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER =
             DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'");
-    public static final DateTimeFormatter AUTH_DATE_FORMATTER =
-            DateTimeFormatter.ofPattern("yyyyMMdd");
     protected static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json");
 
-    private final DLFTokenLoader tokenLoader;
-
-    protected DLFToken token;
-    private final boolean keepRefreshed;
-    private Long expiresAtMillis;
-    private final Long tokenRefreshInMills;
+    @Nullable private final DLFTokenLoader tokenLoader;
+    @Nullable protected DLFToken token;
     private final String region;
 
-    public static DLFAuthProvider buildRefreshToken(
-            DLFTokenLoader tokenLoader, Long tokenRefreshInMills, String 
region) {
-        DLFToken token = tokenLoader.loadToken();
-        Long expiresAtMillis = getExpirationInMills(token.getExpiration());
-        return new DLFAuthProvider(
-                tokenLoader, token, true, expiresAtMillis, 
tokenRefreshInMills, region);
+    public static DLFAuthProvider fromTokenLoader(DLFTokenLoader tokenLoader, 
String region) {
+        return new DLFAuthProvider(tokenLoader, null, region);
     }
 
-    public static DLFAuthProvider buildAKToken(
+    public static DLFAuthProvider fromAccessKey(
             String accessKeyId, String accessKeySecret, String securityToken, 
String region) {
         DLFToken token = new DLFToken(accessKeyId, accessKeySecret, 
securityToken, null);
-        return new DLFAuthProvider(null, token, false, null, null, region);
+        return new DLFAuthProvider(null, token, region);
     }
 
     public DLFAuthProvider(
-            DLFTokenLoader tokenLoader,
-            DLFToken token,
-            boolean keepRefreshed,
-            Long expiresAtMillis,
-            Long tokenRefreshInMills,
-            String region) {
+            @Nullable DLFTokenLoader tokenLoader, @Nullable DLFToken token, 
String region) {
         this.tokenLoader = tokenLoader;
         this.token = token;
-        this.keepRefreshed = keepRefreshed;
-        this.expiresAtMillis = expiresAtMillis;
-        this.tokenRefreshInMills = tokenRefreshInMills;
         this.region = region;
     }
 
     @Override
-    public Map<String, String> header(
+    public Map<String, String> mergeAuthHeader(
             Map<String, String> baseHeader, RESTAuthParameter 
restAuthParameter) {
+        DLFToken token = getFreshToken();
         try {
             String dateTime =
                     baseHeader.getOrDefault(
@@ -109,6 +99,43 @@ public class DLFAuthProvider implements AuthProvider {
         }
     }
 
+    @VisibleForTesting
+    DLFToken getFreshToken() {
+        if (shouldRefresh()) {
+            synchronized (this) {
+                if (shouldRefresh()) {
+                    refreshToken();
+                }
+            }
+        }
+        return token;
+    }
+
+    private void refreshToken() {
+        checkNotNull(tokenLoader);
+        LOG.info("begin refresh meta token for loader [{}]", 
tokenLoader.description());
+        this.token = tokenLoader.loadToken();
+        checkNotNull(token);
+        LOG.info(
+                "end refresh meta token for loader [{}] expiresAtMillis [{}]",
+                tokenLoader.description(),
+                token.getExpirationAtMills());
+    }
+
+    private boolean shouldRefresh() {
+        // no token, get new one
+        if (token == null) {
+            return true;
+        }
+        // never expire
+        Long expireTime = token.getExpirationAtMills();
+        if (expireTime == null) {
+            return false;
+        }
+        long now = System.currentTimeMillis();
+        return expireTime - now < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+    }
+
     public static Map<String, String> generateSignHeaders(
             String data, String dateTime, String securityToken) throws 
Exception {
         Map<String, String> signHeaders = new HashMap<>();
@@ -124,53 +151,4 @@ public class DLFAuthProvider implements AuthProvider {
         }
         return signHeaders;
     }
-
-    @Override
-    public boolean refresh() {
-        long start = System.currentTimeMillis();
-        DLFToken newToken = tokenLoader.loadToken();
-        if (newToken == null) {
-            return false;
-        }
-        this.expiresAtMillis = start + this.tokenRefreshInMills;
-        this.token = newToken;
-        return true;
-    }
-
-    @Override
-    public boolean keepRefreshed() {
-        return this.keepRefreshed;
-    }
-
-    @Override
-    public boolean willSoonExpire() {
-        if (keepRefreshed()) {
-            return expiresAtMillis().get() - System.currentTimeMillis()
-                    < tokenRefreshInMills().get() * EXPIRED_FACTOR;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public Optional<Long> expiresAtMillis() {
-        return Optional.ofNullable(this.expiresAtMillis);
-    }
-
-    @Override
-    public Optional<Long> tokenRefreshInMills() {
-        return Optional.ofNullable(this.tokenRefreshInMills);
-    }
-
-    private static Long getExpirationInMills(String dateStr) {
-        try {
-            if (dateStr == null) {
-                return null;
-            }
-            LocalDateTime dateTime = LocalDateTime.parse(dateStr, 
TOKEN_DATE_FORMATTER);
-            return dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
index 72d9ea5580..030dc396e1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
@@ -24,7 +24,6 @@ import org.apache.paimon.rest.RESTCatalogOptions;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
 import static org.apache.paimon.rest.RESTCatalogOptions.URI;
 
 /** Factory for {@link DLFAuthProvider}. */
@@ -44,16 +43,14 @@ public class DLFAuthProviderFactory implements 
AuthProviderFactory {
             DLFTokenLoader dlfTokenLoader =
                     DLFTokenLoaderFactory.createDLFTokenLoader(
                             options.get(RESTCatalogOptions.DLF_TOKEN_LOADER), 
options);
-            long tokenRefreshInMills = 
options.get(TOKEN_REFRESH_TIME).toMillis();
-            return DLFAuthProvider.buildRefreshToken(dlfTokenLoader, 
tokenRefreshInMills, region);
+            return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region);
         } else if 
(options.getOptional(RESTCatalogOptions.DLF_TOKEN_PATH).isPresent()) {
             DLFTokenLoader dlfTokenLoader =
                     DLFTokenLoaderFactory.createDLFTokenLoader("local_file", 
options);
-            long tokenRefreshInMills = 
options.get(TOKEN_REFRESH_TIME).toMillis();
-            return DLFAuthProvider.buildRefreshToken(dlfTokenLoader, 
tokenRefreshInMills, region);
+            return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region);
         } else if 
(options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_ID).isPresent()
                 && 
options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET).isPresent()) {
-            return DLFAuthProvider.buildAKToken(
+            return DLFAuthProvider.fromAccessKey(
                     options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
                     options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
                     options.get(RESTCatalogOptions.DLF_SECURITY_TOKEN),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
index c73f32c990..3806b045d3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
@@ -53,7 +53,7 @@ public class DLFECSTokenLoader implements DLFTokenLoader {
                     .readTimeout(Duration.ofMinutes(3))
                     .build();
 
-    private String ecsMetadataURL;
+    private final String ecsMetadataURL;
 
     private String roleName;
 
@@ -70,6 +70,11 @@ public class DLFECSTokenLoader implements DLFTokenLoader {
         return getToken(ecsMetadataURL + roleName);
     }
 
+    @Override
+    public String description() {
+        return ecsMetadataURL;
+    }
+
     private static String getRole(String url) {
         try {
             return getResponseBody(url);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
index c1aea84280..22d0f4090d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
@@ -21,17 +21,12 @@ package org.apache.paimon.rest.auth;
 import org.apache.paimon.utils.FileIOUtils;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.UncheckedIOException;
 
 import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
 
 /** DLF Token Loader for local file. */
 public class DLFLocalFileTokenLoader implements DLFTokenLoader {
 
-    private static final long[] READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS = 
{1_000, 3_000, 5_000};
-
     private final String tokenFilePath;
 
     public DLFLocalFileTokenLoader(String tokenFilePath) {
@@ -40,25 +35,32 @@ public class DLFLocalFileTokenLoader implements 
DLFTokenLoader {
 
     @Override
     public DLFToken loadToken() {
-        return readToken(tokenFilePath, 0);
+        return readToken(tokenFilePath);
     }
 
-    protected static DLFToken readToken(String tokenFilePath, int retryTimes) {
-        try {
-            File tokenFile = new File(tokenFilePath);
-            if (tokenFile.exists()) {
-                String tokenStr = FileIOUtils.readFileUtf8(tokenFile);
+    @Override
+    public String description() {
+        return tokenFilePath;
+    }
+
+    protected static DLFToken readToken(String tokenFilePath) {
+        int retry = 1;
+        Exception lastException = null;
+        while (retry <= 5) {
+            try {
+                String tokenStr = FileIOUtils.readFileUtf8(new 
File(tokenFilePath));
                 return OBJECT_MAPPER.readValue(tokenStr, DLFToken.class);
-            } else if (retryTimes < 
READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS.length - 1) {
-                
Thread.sleep(READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS[retryTimes]);
-                return readToken(tokenFilePath, retryTimes + 1);
-            } else {
-                throw new FileNotFoundException(tokenFilePath);
+            } catch (Exception e) {
+                lastException = e;
+            }
+            try {
+                Thread.sleep(retry * 1000L);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
             }
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            retry++;
         }
+        throw new RuntimeException(lastException);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
index c961dfe38b..37590cad8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
@@ -19,15 +19,25 @@
 package org.apache.paimon.rest.auth;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.Objects;
 
 /** <b>Ali CLoud</b> DLF Token. */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class DLFToken {
 
+    public static final DateTimeFormatter TOKEN_DATE_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
     private static final String ACCESS_KEY_ID_FIELD_NAME = "AccessKeyId";
     private static final String ACCESS_KEY_SECRET_FIELD_NAME = 
"AccessKeySecret";
     private static final String SECURITY_TOKEN_FIELD_NAME = "SecurityToken";
@@ -42,19 +52,30 @@ public class DLFToken {
     @JsonProperty(SECURITY_TOKEN_FIELD_NAME)
     private final String securityToken;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     @JsonProperty(EXPIRATION_FIELD_NAME)
+    @Nullable
     private final String expiration;
 
+    @JsonIgnore @Nullable private final Long expirationAtMills;
+
     @JsonCreator
     public DLFToken(
             @JsonProperty(ACCESS_KEY_ID_FIELD_NAME) String accessKeyId,
             @JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME) String accessKeySecret,
             @JsonProperty(SECURITY_TOKEN_FIELD_NAME) String securityToken,
-            @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) {
+            @Nullable @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) {
         this.accessKeyId = accessKeyId;
         this.accessKeySecret = accessKeySecret;
         this.securityToken = securityToken;
         this.expiration = expiration;
+        if (expiration == null) {
+            this.expirationAtMills = null;
+        } else {
+            LocalDateTime dateTime = LocalDateTime.parse(expiration, 
TOKEN_DATE_FORMATTER);
+            // Note: the date time is UTC time zone
+            this.expirationAtMills = 
dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+        }
     }
 
     public String getAccessKeyId() {
@@ -69,8 +90,9 @@ public class DLFToken {
         return securityToken;
     }
 
-    public String getExpiration() {
-        return expiration;
+    @Nullable
+    public Long getExpirationAtMills() {
+        return expirationAtMills;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
index fb2199e320..0b45284223 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
@@ -22,4 +22,6 @@ package org.apache.paimon.rest.auth;
 public interface DLFTokenLoader {
 
     DLFToken loadToken();
+
+    String description();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
index b76701845f..254eca75ca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
@@ -25,18 +25,15 @@ import java.util.function.Function;
 public class RESTAuthFunction implements Function<RESTAuthParameter, 
Map<String, String>> {
 
     private final Map<String, String> initHeader;
-    private final AuthSession authSession;
+    private final AuthProvider authProvider;
 
-    public RESTAuthFunction(Map<String, String> initHeader, AuthSession 
authSession) {
+    public RESTAuthFunction(Map<String, String> initHeader, AuthProvider 
authProvider) {
         this.initHeader = initHeader;
-        this.authSession = authSession;
+        this.authProvider = authProvider;
     }
 
     @Override
     public Map<String, String> apply(RESTAuthParameter restAuthParameter) {
-        if (authSession != null) {
-            return authSession.getAuthProvider().header(initHeader, 
restAuthParameter);
-        }
-        return initHeader;
+        return authProvider.mergeAuthHeader(initHeader, restAuthParameter);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index b1f68bd71e..dfdfe5f9b9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.rest;
 
 import org.apache.paimon.rest.auth.AuthProvider;
-import org.apache.paimon.rest.auth.AuthSession;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthFunction;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
@@ -85,9 +84,8 @@ public class HttpClientTest {
         httpClient = new HttpClient(server.getBaseUrl());
         httpClient.setErrorHandler(errorHandler);
         AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
-        AuthSession authSession = new AuthSession(authProvider);
         headers = new HashMap<>();
-        restAuthFunction = new RESTAuthFunction(headers, authSession);
+        restAuthFunction = new RESTAuthFunction(headers, authProvider);
     }
 
     @After
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index dab6b0fb34..6b8e83ff52 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -99,7 +99,7 @@ class MockRESTCatalogTest extends RESTCatalogTest {
         String akSecret = "akSecret" + UUID.randomUUID();
         String securityToken = "securityToken" + UUID.randomUUID();
         String region = "cn-hangzhou";
-        this.authProvider = DLFAuthProvider.buildAKToken(akId, akSecret, 
securityToken, region);
+        this.authProvider = DLFAuthProvider.fromAccessKey(akId, akSecret, 
securityToken, region);
         this.authMap =
                 ImmutableMap.of(
                         RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.DLF.identifier(),
@@ -122,7 +122,7 @@ class MockRESTCatalogTest extends RESTCatalogTest {
                         new Options(
                                 ImmutableMap.of(
                                         
RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath)));
-        this.authProvider = DLFAuthProvider.buildRefreshToken(tokenLoader, 
1000_000L, region);
+        this.authProvider = DLFAuthProvider.fromTokenLoader(tokenLoader, 
region);
         this.authMap =
                 ImmutableMap.of(
                         RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.DLF.identifier(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 2c4cc894a7..2d539fe5d1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -247,7 +247,7 @@ public class RESTCatalogServer {
                                     resourcePath, parameters, 
request.getMethod(), data);
                     String authToken =
                             authProvider
-                                    .header(headers, restAuthParameter)
+                                    .mergeAuthHeader(headers, 
restAuthParameter)
                                     .get(AUTHORIZATION_HEADER_KEY);
                     if (!authToken.equals(token)) {
                         return new MockResponse().setResponseCode(401);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index cb8392dd96..fc051ecad4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -81,7 +81,7 @@ import static 
org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.rest.RESTCatalog.PAGE_TOKEN;
-import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
+import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
 import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
similarity index 61%
rename from 
paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
index 16285e734d..770dadab5f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.rest.auth;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.rest.RESTCatalogOptions;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.ThreadPoolUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -29,7 +28,6 @@ import org.apache.commons.io.FileUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,10 +35,9 @@ import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_REGION;
@@ -50,21 +47,16 @@ import static 
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_LOADER;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
 import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
-import static 
org.apache.paimon.rest.auth.AuthSession.MAX_REFRESH_WINDOW_MILLIS;
-import static org.apache.paimon.rest.auth.AuthSession.MIN_REFRESH_WAIT_MILLIS;
-import static org.apache.paimon.rest.auth.AuthSession.REFRESH_NUM_RETRIES;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER;
 import static 
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTHORIZATION_HEADER_KEY;
 import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY;
-import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
+import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-/** Test for {@link AuthSession}. */
-public class AuthSessionTest {
+/** Test for {@link AuthProvider}. */
+public class AuthProviderTest {
 
     @Rule public TemporaryFolder folder = new TemporaryFolder();
     private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new 
ObjectMapper();
@@ -77,36 +69,28 @@ public class AuthSessionTest {
         initialHeaders.put("k2", "v2");
         Options options = new Options();
         options.set(TOKEN.key(), token);
-        AuthProvider authProvider =
-                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.BEAR.identifier(), 
options);
-        AuthSession session = new AuthSession(authProvider);
-        Map<String, String> headers = 
session.getAuthProvider().header(initialHeaders, null);
+        options.set(TOKEN_PROVIDER.key(), "bear");
+        AuthProvider authProvider = 
AuthProviderFactory.createAuthProvider(options);
+        Map<String, String> headers = 
authProvider.mergeAuthHeader(initialHeaders, null);
         assertEquals(
                 headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY), 
"Bearer " + token);
     }
 
     @Test
-    public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException, 
InterruptedException {
+    public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException {
         String fileName = UUID.randomUUID().toString();
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String theFirstGenerateToken = tokenFile2Token.getRight();
         File tokenFile = tokenFile2Token.getLeft();
-        long tokenRefreshInMills = 1000;
-        AuthProvider authProvider =
-                generateDLFAuthProvider(Optional.of(tokenRefreshInMills), 
fileName, "serverUrl");
-        ScheduledExecutorService executor =
-                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
-        AuthSession session = AuthSession.fromRefreshAuthProvider(executor, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        DLFAuthProvider authProvider = generateDLFAuthProvider(fileName, 
"serverUrl");
         String theFirstFetchToken =
-                
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(theFirstFetchToken, theFirstGenerateToken);
         tokenFile.delete();
         tokenFile2Token = generateTokenAndWriteToFile(fileName);
         String theSecondGenerateToken = tokenFile2Token.getRight();
-        Thread.sleep(tokenRefreshInMills * 2);
         String theSecondFetchToken =
-                
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         // if the second fetch token is not equal to the first fetch token, it 
means refresh success
         // as refresh maybe fail in test environment, so we need to check 
whether refresh success
         if (!theSecondFetchToken.equals(theFirstFetchToken)) {
@@ -115,62 +99,23 @@ public class AuthSessionTest {
     }
 
     @Test
-    public void testRefreshAuthProviderIsSoonExpire() throws IOException, 
InterruptedException {
+    public void testRefreshAuthProviderIsSoonExpire() throws IOException {
         String fileName = UUID.randomUUID().toString();
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String token = tokenFile2Token.getRight();
         File tokenFile = tokenFile2Token.getLeft();
-        long tokenRefreshInMills = 5000L;
-        AuthProvider authProvider =
-                generateDLFAuthProvider(Optional.of(tokenRefreshInMills), 
fileName, "serverUrl");
-        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
-        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        DLFAuthProvider authProvider = generateDLFAuthProvider(fileName, 
"serverUrl");
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(token, authToken);
-        Thread.sleep((long) (tokenRefreshInMills * (1 - 
DLFAuthProvider.EXPIRED_FACTOR)) + 10L);
         tokenFile.delete();
         tokenFile2Token = generateTokenAndWriteToFile(fileName);
         token = tokenFile2Token.getRight();
         tokenFile = tokenFile2Token.getLeft();
         FileUtils.writeStringToFile(tokenFile, token);
-        dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider();
-        authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(token, authToken);
     }
 
-    @Test
-    public void testRetryWhenRefreshFail() throws Exception {
-        AuthProvider authProvider = Mockito.mock(DLFAuthProvider.class);
-        long expiresAtMillis = System.currentTimeMillis() - 1000L;
-        
when(authProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis));
-        when(authProvider.tokenRefreshInMills()).thenReturn(Optional.of(50L));
-        when(authProvider.keepRefreshed()).thenReturn(true);
-        when(authProvider.refresh()).thenReturn(false);
-        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
-        AuthSession.scheduleTokenRefresh(
-                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"),
-                session,
-                expiresAtMillis);
-        Thread.sleep(10_000L);
-        verify(authProvider, Mockito.times(REFRESH_NUM_RETRIES + 1)).refresh();
-    }
-
-    @Test
-    public void testGetTimeToWaitByExpiresInMills() {
-        long expiresInMillis = -100L;
-        long timeToWait = 
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
-        assertEquals(MIN_REFRESH_WAIT_MILLIS, timeToWait);
-        expiresInMillis = (long) (MAX_REFRESH_WINDOW_MILLIS * 0.5);
-        timeToWait = 
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
-        assertEquals(MIN_REFRESH_WAIT_MILLIS, timeToWait);
-        expiresInMillis = MAX_REFRESH_WINDOW_MILLIS;
-        timeToWait = 
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
-        assertEquals(timeToWait, MIN_REFRESH_WAIT_MILLIS);
-        expiresInMillis = MAX_REFRESH_WINDOW_MILLIS * 2L;
-        timeToWait = 
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
-        assertEquals(timeToWait, MAX_REFRESH_WINDOW_MILLIS);
-    }
-
     @Test
     public void testCreateDLFAuthProviderByStsToken() throws IOException {
         Options options = new Options();
@@ -182,11 +127,10 @@ public class AuthSessionTest {
         options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
         options.set(DLF_SECURITY_TOKEN.key(), token.getSecurityToken());
         options.set(DLF_REGION.key(), "cn-hangzhou");
-        AuthProvider authProvider =
-                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
-        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
-        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        options.set(TOKEN_PROVIDER, "dlf");
+        DLFAuthProvider authProvider =
+                (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token), 
authToken);
     }
 
@@ -199,11 +143,10 @@ public class AuthSessionTest {
         options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId());
         options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
         options.set(DLF_REGION.key(), "cn-hangzhou");
-        AuthProvider authProvider =
-                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
-        AuthSession session = AuthSession.fromRefreshAuthProvider(null, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
-        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        options.set(TOKEN_PROVIDER, "dlf");
+        DLFAuthProvider authProvider =
+                (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token), 
authToken);
     }
 
@@ -212,55 +155,43 @@ public class AuthSessionTest {
         String fileName = UUID.randomUUID().toString();
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String token = tokenFile2Token.getRight();
-        AuthProvider authProvider =
-                generateDLFAuthProvider(Optional.empty(), fileName, 
"serverUrl");
-        ScheduledExecutorService executor =
-                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
-        AuthSession session = AuthSession.fromRefreshAuthProvider(executor, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
-        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+        DLFAuthProvider authProvider = generateDLFAuthProvider(fileName, 
"serverUrl");
+        String authToken = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(authToken, token);
     }
 
     @Test
     public void testCreateDLFAuthProviderWithoutNeedConf() {
+        Options options = new Options();
+        options.set(TOKEN_PROVIDER, "dlf");
         assertThrows(
                 IllegalArgumentException.class,
-                () ->
-                        AuthProviderFactory.createAuthProvider(
-                                AuthProviderEnum.DLF.identifier(), new 
Options()));
+                () -> AuthProviderFactory.createAuthProvider(new Options()));
     }
 
     @Test
-    public void testCreateDlfAuthProviderByDLFTokenLoader()
-            throws IOException, InterruptedException {
+    public void testCreateDlfAuthProviderByDLFTokenLoader() throws IOException 
{
         String fileName = UUID.randomUUID().toString();
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String theFirstGenerateToken = tokenFile2Token.getRight();
         File tokenFile = tokenFile2Token.getLeft();
-        long tokenRefreshInMills = 1000;
         // create options with token loader
         Options options = new Options();
         options.set(DLF_TOKEN_LOADER.key(), "local_file");
         options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" + 
fileName);
         options.set(RESTCatalogOptions.URI.key(), "serverUrl");
         options.set(DLF_REGION.key(), "cn-hangzhou");
-        options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
-        AuthProvider authProvider =
-                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
-        ScheduledExecutorService executor =
-                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
-        AuthSession session = AuthSession.fromRefreshAuthProvider(executor, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+        options.set(TOKEN_PROVIDER, "dlf");
+        DLFAuthProvider authProvider =
+                (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
         String theFirstFetchToken =
-                
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         assertEquals(theFirstFetchToken, theFirstGenerateToken);
         tokenFile.delete();
         tokenFile2Token = generateTokenAndWriteToFile(fileName);
         String theSecondGenerateToken = tokenFile2Token.getRight();
-        Thread.sleep(tokenRefreshInMills * 2);
         String theSecondFetchToken =
-                
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
         // if the second fetch token is not equal to the first fetch token, it 
means refresh success
         // as refresh maybe fail in test environment, so we need to check 
whether refresh success
         if (!theSecondFetchToken.equals(theFirstFetchToken)) {
@@ -269,8 +200,7 @@ public class AuthSessionTest {
     }
 
     @Test
-    public void testCreateDlfAuthProviderByCustomDLFTokenLoader()
-            throws IOException, InterruptedException {
+    public void testCreateDlfAuthProviderByCustomDLFTokenLoader() {
         DLFToken customToken = generateToken();
         // create options with custom token loader
         Options options = new Options();
@@ -280,21 +210,17 @@ public class AuthSessionTest {
         options.set(DLF_SECURITY_TOKEN.key(), customToken.getSecurityToken());
         options.set(RESTCatalogOptions.URI.key(), "serverUrl");
         options.set(DLF_REGION.key(), "cn-hangzhou");
-        AuthProvider authProvider =
-                
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
-        ScheduledExecutorService executor =
-                ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
-        AuthSession session = AuthSession.fromRefreshAuthProvider(executor, 
authProvider);
-        DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
-        DLFToken fetchToken = dlfAuthProvider.token;
+        options.set(TOKEN_PROVIDER, "dlf");
+        DLFAuthProvider authProvider =
+                (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
+        DLFToken fetchToken = authProvider.getFreshToken();
         assertEquals(fetchToken.getAccessKeyId(), 
customToken.getAccessKeyId());
         assertEquals(fetchToken.getAccessKeySecret(), 
customToken.getAccessKeySecret());
         assertEquals(fetchToken.getSecurityToken(), 
customToken.getSecurityToken());
     }
 
     @Test
-    public void testCreateDlfAuthProviderByECSTokenProvider()
-            throws IOException, InterruptedException {
+    public void testCreateDlfAuthProviderByECSTokenProvider() throws 
IOException {
         MockECSMetadataService mockECSMetadataService = new 
MockECSMetadataService("EcsTestRole");
         mockECSMetadataService.start();
         try {
@@ -302,7 +228,6 @@ public class AuthSessionTest {
             mockECSMetadataService.setMockToken(theFirstMockToken);
             String theFirstMockTokenStr =
                     
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
-            long tokenRefreshInMills = 1000;
             // create options with token loader
             Options options = new Options();
             options.set(DLF_TOKEN_LOADER.key(), "ecs");
@@ -311,34 +236,41 @@ public class AuthSessionTest {
                     mockECSMetadataService.getUrl() + 
"latest/meta-data/Ram/security-credentials/");
             options.set(RESTCatalogOptions.URI.key(), "serverUrl");
             options.set(DLF_REGION.key(), "cn-hangzhou");
-            options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
-            AuthProvider authProvider =
-                    AuthProviderFactory.createAuthProvider(
-                            AuthProviderEnum.DLF.identifier(), options);
-            ScheduledExecutorService executor =
-                    ThreadPoolUtils.createScheduledThreadPool(1, 
"refresh-token");
-            AuthSession session = 
AuthSession.fromRefreshAuthProvider(executor, authProvider);
-            DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+            options.set(TOKEN_PROVIDER, "dlf");
+
+            DLFAuthProvider authProvider =
+                    (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
+
+            // first token
             String theFirstFetchTokenStr =
-                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
             assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
 
-            DLFToken theSecondMockToken = generateToken();
+            // second token
+            DLFToken theSecondMockToken =
+                    generateToken(
+                            ZonedDateTime.now(ZoneOffset.UTC)
+                                    
.plusSeconds(TOKEN_EXPIRATION_SAFE_TIME_MILLIS * 2 / 1000));
             String theSecondMockTokenStr =
                     
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
             mockECSMetadataService.setMockToken(theSecondMockToken);
-            Thread.sleep(tokenRefreshInMills * 2);
             String theSecondFetchTokenStr =
-                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
             assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
+
+            // third token, should not refresh
+            DLFToken theThirdMockToken = generateToken();
+            mockECSMetadataService.setMockToken(theThirdMockToken);
+            String theThirdFetchTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
+            assertEquals(theThirdFetchTokenStr, theSecondMockTokenStr);
         } finally {
             mockECSMetadataService.shutdown();
         }
     }
 
     @Test
-    public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole()
-            throws IOException, InterruptedException {
+    public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole() 
throws IOException {
         MockECSMetadataService mockECSMetadataService = new 
MockECSMetadataService("CustomRole");
         mockECSMetadataService.start();
         try {
@@ -346,7 +278,6 @@ public class AuthSessionTest {
             mockECSMetadataService.setMockToken(theFirstMockToken);
             String theFirstMockTokenStr =
                     
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
-            long tokenRefreshInMills = 1000;
             // create options with token loader
             Options options = new Options();
             options.set(DLF_TOKEN_LOADER.key(), "ecs");
@@ -356,25 +287,19 @@ public class AuthSessionTest {
             options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
             options.set(RESTCatalogOptions.URI.key(), "serverUrl");
             options.set(DLF_REGION.key(), "cn-hangzhou");
-            options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
-            AuthProvider authProvider =
-                    AuthProviderFactory.createAuthProvider(
-                            AuthProviderEnum.DLF.identifier(), options);
-            ScheduledExecutorService executor =
-                    ThreadPoolUtils.createScheduledThreadPool(1, 
"refresh-token");
-            AuthSession session = 
AuthSession.fromRefreshAuthProvider(executor, authProvider);
-            DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+            options.set(TOKEN_PROVIDER, "dlf");
+            DLFAuthProvider authProvider =
+                    (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
             String theFirstFetchTokenStr =
-                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
             assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
 
             DLFToken theSecondMockToken = generateToken();
             String theSecondMockTokenStr =
                     
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
             mockECSMetadataService.setMockToken(theSecondMockToken);
-            Thread.sleep(tokenRefreshInMills * 2);
             String theSecondFetchTokenStr =
-                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
             assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
         } finally {
             mockECSMetadataService.shutdown();
@@ -382,8 +307,7 @@ public class AuthSessionTest {
     }
 
     @Test
-    public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole()
-            throws IOException, InterruptedException {
+    public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole() 
throws IOException {
         MockECSMetadataService mockECSMetadataService = new 
MockECSMetadataService("EcsTestRole");
         mockECSMetadataService.start();
         try {
@@ -398,13 +322,10 @@ public class AuthSessionTest {
             options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
             options.set(RESTCatalogOptions.URI.key(), "serverUrl");
             options.set(DLF_REGION.key(), "cn-hangzhou");
-            assertThrows(
-                    RuntimeException.class,
-                    () -> {
-                        AuthProvider authProvider =
-                                AuthProviderFactory.createAuthProvider(
-                                        AuthProviderEnum.DLF.identifier(), 
options);
-                    });
+            options.set(TOKEN_PROVIDER, "dlf");
+            DLFAuthProvider authProvider =
+                    (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
+            assertThrows(RuntimeException.class, authProvider::getFreshToken);
         } finally {
             mockECSMetadataService.shutdown();
         }
@@ -416,7 +337,7 @@ public class AuthSessionTest {
         Pair<File, String> tokenFile2Token = 
generateTokenAndWriteToFile(fileName);
         String tokenStr = tokenFile2Token.getRight();
         String serverUrl = "https://dlf-cn-hangzhou.aliyuncs.com";;
-        AuthProvider authProvider = generateDLFAuthProvider(Optional.empty(), 
fileName, serverUrl);
+        AuthProvider authProvider = generateDLFAuthProvider(fileName, 
serverUrl);
         DLFToken token = OBJECT_MAPPER_INSTANCE.readValue(tokenStr, 
DLFToken.class);
         Map<String, String> parameters = new HashMap<>();
         parameters.put("k1", "v1");
@@ -424,7 +345,8 @@ public class AuthSessionTest {
         String data = "data";
         RESTAuthParameter restAuthParameter =
                 new RESTAuthParameter("/path", parameters, "method", "data");
-        Map<String, String> header = authProvider.header(new HashMap<>(), 
restAuthParameter);
+        Map<String, String> header =
+                authProvider.mergeAuthHeader(new HashMap<>(), 
restAuthParameter);
         String authorization = header.get(DLF_AUTHORIZATION_HEADER_KEY);
         String[] credentials = authorization.split(",")[0].split(" 
")[1].split("/");
         String dateTime = header.get(DLF_DATE_HEADER_KEY);
@@ -456,8 +378,7 @@ public class AuthSessionTest {
 
     private Pair<File, String> generateTokenAndWriteToFile(String fileName) 
throws IOException {
         File tokenFile = folder.newFile(fileName);
-        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
-        String expiration = now.format(TOKEN_DATE_FORMATTER);
+        String expiration = 
ZonedDateTime.now(ZoneOffset.UTC).format(TOKEN_DATE_FORMATTER);
         String secret = UUID.randomUUID().toString();
         DLFToken token = new DLFToken("accessKeyId", secret, "securityToken", 
expiration);
         String tokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(token);
@@ -466,23 +387,23 @@ public class AuthSessionTest {
     }
 
     private DLFToken generateToken() {
+        return generateToken(ZonedDateTime.now(ZoneOffset.UTC));
+    }
+
+    private DLFToken generateToken(ZonedDateTime expireTime) {
         String accessKeyId = UUID.randomUUID().toString();
         String accessKeySecret = UUID.randomUUID().toString();
         String securityToken = UUID.randomUUID().toString();
-        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
-        String expiration = now.format(TOKEN_DATE_FORMATTER);
+        String expiration = expireTime.format(TOKEN_DATE_FORMATTER);
         return new DLFToken(accessKeyId, accessKeySecret, securityToken, 
expiration);
     }
 
-    private AuthProvider generateDLFAuthProvider(
-            Optional<Long> tokenRefreshInMillsOpt, String fileName, String 
serverUrl) {
+    private DLFAuthProvider generateDLFAuthProvider(String fileName, String 
serverUrl) {
         Options options = new Options();
         options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" + 
fileName);
         options.set(RESTCatalogOptions.URI.key(), serverUrl);
         options.set(DLF_REGION.key(), "cn-hangzhou");
-        tokenRefreshInMillsOpt.ifPresent(
-                tokenRefreshInMills ->
-                        options.set(TOKEN_REFRESH_TIME.key(), 
tokenRefreshInMills + "ms"));
-        return 
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(), 
options);
+        options.set(TOKEN_PROVIDER, "dlf");
+        return (DLFAuthProvider) 
AuthProviderFactory.createAuthProvider(options);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
index 279c70080b..139df16dea 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.rest.auth;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 
-import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
+import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
 
 /** DLF Token Loader for custom. */
 public class CustomTestDLFTokenLoader implements DLFTokenLoader {
@@ -39,4 +39,9 @@ public class CustomTestDLFTokenLoader implements 
DLFTokenLoader {
     public DLFToken loadToken() {
         return token;
     }
+
+    @Override
+    public String description() {
+        return "custom";
+    }
 }

Reply via email to