This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 98e46495cd [rest] Simplify auth provider to synchronously refresh
token (#5562)
98e46495cd is described below
commit 98e46495cd4849ab3be73916cf8c15ea6e5c5a3c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 30 23:17:49 2025 +0800
[rest] Simplify auth provider to synchronously refresh token (#5562)
---
.../java/org/apache/paimon/rest/HttpClient.java | 3 -
.../java/org/apache/paimon/rest/RESTCatalog.java | 36 +--
.../org/apache/paimon/rest/RESTCatalogOptions.java | 8 -
.../java/org/apache/paimon/rest/RESTClient.java | 3 +-
.../org/apache/paimon/rest/RESTTokenFileIO.java | 9 +-
.../org/apache/paimon/rest/auth/AuthProvider.java | 22 +-
.../paimon/rest/auth/AuthProviderFactory.java | 11 +-
.../org/apache/paimon/rest/auth/AuthSession.java | 152 -------------
.../paimon/rest/auth/BearTokenAuthProvider.java | 7 +-
.../apache/paimon/rest/auth/DLFAuthProvider.java | 138 +++++-------
.../paimon/rest/auth/DLFAuthProviderFactory.java | 9 +-
.../apache/paimon/rest/auth/DLFECSTokenLoader.java | 7 +-
.../paimon/rest/auth/DLFLocalFileTokenLoader.java | 42 ++--
.../java/org/apache/paimon/rest/auth/DLFToken.java | 28 ++-
.../apache/paimon/rest/auth/DLFTokenLoader.java | 2 +
.../apache/paimon/rest/auth/RESTAuthFunction.java | 11 +-
.../org/apache/paimon/rest/HttpClientTest.java | 4 +-
.../apache/paimon/rest/MockRESTCatalogTest.java | 4 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogTest.java | 2 +-
...{AuthSessionTest.java => AuthProviderTest.java} | 247 +++++++--------------
.../paimon/rest/auth/CustomTestDLFTokenLoader.java | 7 +-
22 files changed, 241 insertions(+), 513 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index dca501b693..12f7f2938a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -241,7 +241,4 @@ public class HttpClient implements RESTClient {
new RESTAuthParameter(path, queryParams, method, data);
return headerFunction.apply(restAuthParameter);
}
-
- @Override
- public void close() {}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 50c8ebbe65..01e4e75e95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -33,7 +33,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
-import org.apache.paimon.rest.auth.AuthSession;
+import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.RESTAuthFunction;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
@@ -102,7 +102,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -118,8 +117,7 @@ import static
org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
-import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
-import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
+import static
org.apache.paimon.rest.auth.AuthProviderFactory.createAuthProvider;
/** A catalog implementation for REST. */
public class RESTCatalog implements Catalog {
@@ -132,6 +130,7 @@ public class RESTCatalog implements Catalog {
public static final String TABLE_NAME_PATTERN = "tableNamePattern";
public static final String VIEW_NAME_PATTERN = "viewNamePattern";
public static final String PARTITION_NAME_PATTERN = "partitionNamePattern";
+ public static final long TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000L;
private final RESTClient client;
private final ResourcePaths resourcePaths;
@@ -139,15 +138,13 @@ public class RESTCatalog implements Catalog {
private final boolean dataTokenEnabled;
private final RESTAuthFunction restAuthFunction;
- private volatile ScheduledExecutorService refreshExecutor = null;
-
public RESTCatalog(CatalogContext context) {
this(context, true);
}
public RESTCatalog(CatalogContext context, boolean configRequired) {
this.client = new
HttpClient(context.options().get(RESTCatalogOptions.URI));
- AuthSession catalogAuth = createAuthSession(context.options(),
tokenRefreshExecutor());
+ AuthProvider authProvider = createAuthProvider(context.options());
Options options = context.options();
Map<String, String> baseHeaders = Collections.emptyMap();
if (configRequired) {
@@ -165,11 +162,11 @@ public class RESTCatalog implements Catalog {
queryParams,
ConfigResponse.class,
new RESTAuthFunction(
- Collections.emptyMap(),
catalogAuth))
+ Collections.emptyMap(),
authProvider))
.merge(context.options().toMap()));
baseHeaders.putAll(extractPrefixMap(options, HEADER_PREFIX));
}
- this.restAuthFunction = new RESTAuthFunction(baseHeaders, catalogAuth);
+ this.restAuthFunction = new RESTAuthFunction(baseHeaders,
authProvider);
context = CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
this.context = context;
this.resourcePaths = ResourcePaths.forCatalogProperties(options);
@@ -983,14 +980,7 @@ public class RESTCatalog implements Catalog {
}
@Override
- public void close() throws Exception {
- if (refreshExecutor != null) {
- refreshExecutor.shutdownNow();
- }
- if (client != null) {
- client.close();
- }
- }
+ public void close() throws Exception {}
@VisibleForTesting
Map<String, String> headers(RESTAuthParameter restAuthParameter) {
@@ -1036,18 +1026,6 @@ public class RESTCatalog implements Catalog {
return results;
}
- private ScheduledExecutorService tokenRefreshExecutor() {
- if (refreshExecutor == null) {
- synchronized (this) {
- if (refreshExecutor == null) {
- this.refreshExecutor = createScheduledThreadPool(1,
"token-refresh-thread");
- }
- }
- }
-
- return refreshExecutor;
- }
-
private FileIO fileIOForData(Path path, Identifier identifier) {
return dataTokenEnabled
? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 0359f6fa25..8c5cc586b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -21,8 +21,6 @@ package org.apache.paimon.rest;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
-import java.time.Duration;
-
/** Options for REST Catalog. */
public class RESTCatalogOptions {
@@ -38,12 +36,6 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog auth bear token.");
- public static final ConfigOption<Duration> TOKEN_REFRESH_TIME =
- ConfigOptions.key("token.refresh-time")
- .durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription("REST Catalog auth token refresh time.");
-
public static final ConfigOption<String> TOKEN_PROVIDER =
ConfigOptions.key("token.provider")
.stringType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
index ac1a27a2be..b2058ec806 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
@@ -20,11 +20,10 @@ package org.apache.paimon.rest;
import org.apache.paimon.rest.auth.RESTAuthFunction;
-import java.io.Closeable;
import java.util.Map;
/** Interface for a basic HTTP Client for interfacing with the REST catalog. */
-public interface RESTClient extends Closeable {
+public interface RESTClient {
<T extends RESTResponse> T get(
String path, Class<T> responseType, RESTAuthFunction
restAuthFunction);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index bcf7840dc5..13e31e8634 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+import static
org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
/** A {@link FileIO} to support getting token from REST Server. */
public class RESTTokenFileIO implements FileIO {
@@ -191,11 +192,13 @@ public class RESTTokenFileIO implements FileIO {
}
private boolean shouldRefresh() {
- return token == null || token.expireAtMillis() -
System.currentTimeMillis() < 3600_000L;
+ return token == null
+ || token.expireAtMillis() - System.currentTimeMillis()
+ < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
}
private void refreshToken() {
- LOG.info("begin refresh token for identifier [{}]", identifier);
+ LOG.info("begin refresh data token for identifier [{}]", identifier);
GetTableTokenResponse response;
if (catalogInstance != null) {
try {
@@ -211,7 +214,7 @@ public class RESTTokenFileIO implements FileIO {
}
}
LOG.info(
- "end refresh token for identifier [{}] expiresAtMillis [{}]",
+ "end refresh data token for identifier [{}] expiresAtMillis
[{}]",
identifier,
response.getExpiresAtMillis());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
index 666337ec79..d7755ff053 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
@@ -19,28 +19,10 @@
package org.apache.paimon.rest.auth;
import java.util.Map;
-import java.util.Optional;
/** Authentication provider. */
public interface AuthProvider {
- Map<String, String> header(Map<String, String> baseHeader,
RESTAuthParameter restAuthParameter);
-
- boolean refresh();
-
- default boolean keepRefreshed() {
- return false;
- }
-
- default boolean willSoonExpire() {
- return false;
- }
-
- default Optional<Long> expiresAtMillis() {
- return Optional.empty();
- }
-
- default Optional<Long> tokenRefreshInMills() {
- return Optional.empty();
- }
+ Map<String, String> mergeAuthHeader(
+ Map<String, String> baseHeader, RESTAuthParameter
restAuthParameter);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
index 1cccfeb626..7562451a34 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
@@ -21,18 +21,25 @@ package org.apache.paimon.rest.auth;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER;
/** Factory for {@link AuthProvider}. */
public interface AuthProviderFactory extends Factory {
AuthProvider create(Options options);
- static AuthProvider createAuthProvider(String name, Options options) {
+ static AuthProvider createAuthProvider(Options options) {
+ String tokenProvider = options.get(TOKEN_PROVIDER);
+ if (StringUtils.isEmpty(tokenProvider)) {
+ throw new IllegalArgumentException("token.provider is not set.");
+ }
AuthProviderFactory factory =
FactoryUtil.discoverFactory(
AuthProviderFactory.class.getClassLoader(),
AuthProviderFactory.class,
- name);
+ tokenProvider);
return factory.create(options);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
deleted file mode 100644
index 1ce391e00d..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest.auth;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTCatalogOptions;
-import org.apache.paimon.utils.StringUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/** Authentication session. */
-public class AuthSession {
-
- static final int REFRESH_NUM_RETRIES = 5;
- static final long MIN_REFRESH_WAIT_MILLIS = 10;
- static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
-
- private static final Logger LOG =
LoggerFactory.getLogger(AuthSession.class);
-
- private final AuthProvider authProvider;
-
- public AuthSession(AuthProvider authProvider) {
- this.authProvider = authProvider;
- }
-
- public static AuthSession fromRefreshAuthProvider(
- ScheduledExecutorService executor, AuthProvider authProvider) {
- AuthSession session = new AuthSession(authProvider);
-
- long startTimeMillis = System.currentTimeMillis();
- Optional<Long> expiresAtMillisOpt = authProvider.expiresAtMillis();
-
- // when init session if token expire time is in the past, refresh it
and update
- // expiresAtMillis
- if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <=
startTimeMillis) {
- boolean refreshSuccessful = session.refresh();
- if (refreshSuccessful) {
- expiresAtMillisOpt = session.authProvider.expiresAtMillis();
- }
- }
-
- if (null != executor && expiresAtMillisOpt.isPresent()) {
- scheduleTokenRefresh(executor, session, expiresAtMillisOpt.get());
- }
-
- return session;
- }
-
- public AuthProvider getAuthProvider() {
- if (this.authProvider.keepRefreshed() &&
this.authProvider.willSoonExpire()) {
- refresh();
- }
- return this.authProvider;
- }
-
- public Boolean refresh() {
- if (this.authProvider.keepRefreshed()
- && this.authProvider.tokenRefreshInMills().isPresent()) {
- return this.authProvider.refresh();
- }
-
- return false;
- }
-
- @VisibleForTesting
- static void scheduleTokenRefresh(
- ScheduledExecutorService executor, AuthSession session, long
expiresAtMillis) {
- scheduleTokenRefresh(executor, session, expiresAtMillis, 0);
- }
-
- @VisibleForTesting
- static long getTimeToWaitByExpiresInMills(long expiresInMillis) {
- // how much ahead of time to start the refresh to allow it to complete
- long refreshWindowMillis = Math.min(expiresInMillis,
MAX_REFRESH_WINDOW_MILLIS);
- // how much time to wait before expiration
- long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
- // how much time to actually wait
- return Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS);
- }
-
- private static void scheduleTokenRefresh(
- ScheduledExecutorService executor,
- AuthSession session,
- long expiresAtMillis,
- int retryTimes) {
- if (retryTimes < REFRESH_NUM_RETRIES) {
- long expiresInMillis = expiresAtMillis -
System.currentTimeMillis();
- long timeToWait = getTimeToWaitByExpiresInMills(expiresInMillis);
-
- executor.schedule(
- () -> doRefresh(executor, session, expiresAtMillis,
retryTimes),
- timeToWait,
- TimeUnit.MILLISECONDS);
- } else {
- LOG.warn("Failed to refresh token after {} retries.",
REFRESH_NUM_RETRIES);
- }
- }
-
- private static void doRefresh(
- ScheduledExecutorService executor,
- AuthSession session,
- long expiresAtMillis,
- int retryTimes) {
- long refreshStartTime = System.currentTimeMillis();
- boolean isSuccessful = session.refresh();
- if (isSuccessful) {
- scheduleTokenRefresh(
- executor,
- session,
- refreshStartTime +
session.authProvider.tokenRefreshInMills().get(),
- 0);
- } else {
- scheduleTokenRefresh(executor, session, expiresAtMillis,
retryTimes + 1);
- }
- }
-
- public static AuthSession createAuthSession(
- Options options, ScheduledExecutorService refreshExecutor) {
- String tokenProvider = options.get(RESTCatalogOptions.TOKEN_PROVIDER);
- if (StringUtils.isEmpty(tokenProvider)) {
- throw new IllegalArgumentException("token.provider is not set.");
- }
- AuthProvider authProvider =
AuthProviderFactory.createAuthProvider(tokenProvider, options);
- if (authProvider.keepRefreshed()) {
- return AuthSession.fromRefreshAuthProvider(refreshExecutor,
authProvider);
- } else {
- return new AuthSession(authProvider);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
index a501b98556..39625818a6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
@@ -35,15 +35,10 @@ public class BearTokenAuthProvider implements AuthProvider {
}
@Override
- public Map<String, String> header(
+ public Map<String, String> mergeAuthHeader(
Map<String, String> baseHeader, RESTAuthParameter
restAuthParameter) {
Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
headersWithAuth.put(AUTHORIZATION_HEADER_KEY, BEARER_PREFIX + token);
return headersWithAuth;
}
-
- @Override
- public boolean refresh() {
- return true;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
index f82720b33f..7eece82a92 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
@@ -18,19 +18,28 @@
package org.apache.paimon.rest.auth;
+import org.apache.paimon.annotation.VisibleForTesting;
+
import okhttp3.MediaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
-import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
+
+import static
org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Auth provider for <b>Ali CLoud</b> DLF. */
public class DLFAuthProvider implements AuthProvider {
+ private static final Logger LOG =
LoggerFactory.getLogger(DLFAuthProvider.class);
+
public static final String DLF_AUTHORIZATION_HEADER_KEY = "Authorization";
public static final String DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5";
public static final String DLF_CONTENT_TYPE_KEY = "Content-Type";
@@ -39,55 +48,36 @@ public class DLFAuthProvider implements AuthProvider {
public static final String DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version";
public static final String DLF_CONTENT_SHA56_HEADER_KEY =
"x-dlf-content-sha256";
public static final String DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD";
- public static final double EXPIRED_FACTOR = 0.4;
- public static final DateTimeFormatter TOKEN_DATE_FORMATTER =
- DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'");
- public static final DateTimeFormatter AUTH_DATE_FORMATTER =
- DateTimeFormatter.ofPattern("yyyyMMdd");
protected static final MediaType MEDIA_TYPE =
MediaType.parse("application/json");
- private final DLFTokenLoader tokenLoader;
-
- protected DLFToken token;
- private final boolean keepRefreshed;
- private Long expiresAtMillis;
- private final Long tokenRefreshInMills;
+ @Nullable private final DLFTokenLoader tokenLoader;
+ @Nullable protected DLFToken token;
private final String region;
- public static DLFAuthProvider buildRefreshToken(
- DLFTokenLoader tokenLoader, Long tokenRefreshInMills, String
region) {
- DLFToken token = tokenLoader.loadToken();
- Long expiresAtMillis = getExpirationInMills(token.getExpiration());
- return new DLFAuthProvider(
- tokenLoader, token, true, expiresAtMillis,
tokenRefreshInMills, region);
+ public static DLFAuthProvider fromTokenLoader(DLFTokenLoader tokenLoader,
String region) {
+ return new DLFAuthProvider(tokenLoader, null, region);
}
- public static DLFAuthProvider buildAKToken(
+ public static DLFAuthProvider fromAccessKey(
String accessKeyId, String accessKeySecret, String securityToken,
String region) {
DLFToken token = new DLFToken(accessKeyId, accessKeySecret,
securityToken, null);
- return new DLFAuthProvider(null, token, false, null, null, region);
+ return new DLFAuthProvider(null, token, region);
}
public DLFAuthProvider(
- DLFTokenLoader tokenLoader,
- DLFToken token,
- boolean keepRefreshed,
- Long expiresAtMillis,
- Long tokenRefreshInMills,
- String region) {
+ @Nullable DLFTokenLoader tokenLoader, @Nullable DLFToken token,
String region) {
this.tokenLoader = tokenLoader;
this.token = token;
- this.keepRefreshed = keepRefreshed;
- this.expiresAtMillis = expiresAtMillis;
- this.tokenRefreshInMills = tokenRefreshInMills;
this.region = region;
}
@Override
- public Map<String, String> header(
+ public Map<String, String> mergeAuthHeader(
Map<String, String> baseHeader, RESTAuthParameter
restAuthParameter) {
+ DLFToken token = getFreshToken();
try {
String dateTime =
baseHeader.getOrDefault(
@@ -109,6 +99,43 @@ public class DLFAuthProvider implements AuthProvider {
}
}
+ @VisibleForTesting
+ DLFToken getFreshToken() {
+ if (shouldRefresh()) {
+ synchronized (this) {
+ if (shouldRefresh()) {
+ refreshToken();
+ }
+ }
+ }
+ return token;
+ }
+
+ private void refreshToken() {
+ checkNotNull(tokenLoader);
+ LOG.info("begin refresh meta token for loader [{}]",
tokenLoader.description());
+ this.token = tokenLoader.loadToken();
+ checkNotNull(token);
+ LOG.info(
+ "end refresh meta token for loader [{}] expiresAtMillis [{}]",
+ tokenLoader.description(),
+ token.getExpirationAtMills());
+ }
+
+ private boolean shouldRefresh() {
+ // no token, get new one
+ if (token == null) {
+ return true;
+ }
+ // never expire
+ Long expireTime = token.getExpirationAtMills();
+ if (expireTime == null) {
+ return false;
+ }
+ long now = System.currentTimeMillis();
+ return expireTime - now < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
+ }
+
public static Map<String, String> generateSignHeaders(
String data, String dateTime, String securityToken) throws
Exception {
Map<String, String> signHeaders = new HashMap<>();
@@ -124,53 +151,4 @@ public class DLFAuthProvider implements AuthProvider {
}
return signHeaders;
}
-
- @Override
- public boolean refresh() {
- long start = System.currentTimeMillis();
- DLFToken newToken = tokenLoader.loadToken();
- if (newToken == null) {
- return false;
- }
- this.expiresAtMillis = start + this.tokenRefreshInMills;
- this.token = newToken;
- return true;
- }
-
- @Override
- public boolean keepRefreshed() {
- return this.keepRefreshed;
- }
-
- @Override
- public boolean willSoonExpire() {
- if (keepRefreshed()) {
- return expiresAtMillis().get() - System.currentTimeMillis()
- < tokenRefreshInMills().get() * EXPIRED_FACTOR;
- } else {
- return false;
- }
- }
-
- @Override
- public Optional<Long> expiresAtMillis() {
- return Optional.ofNullable(this.expiresAtMillis);
- }
-
- @Override
- public Optional<Long> tokenRefreshInMills() {
- return Optional.ofNullable(this.tokenRefreshInMills);
- }
-
- private static Long getExpirationInMills(String dateStr) {
- try {
- if (dateStr == null) {
- return null;
- }
- LocalDateTime dateTime = LocalDateTime.parse(dateStr,
TOKEN_DATE_FORMATTER);
- return dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
index 72d9ea5580..030dc396e1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
@@ -24,7 +24,6 @@ import org.apache.paimon.rest.RESTCatalogOptions;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
import static org.apache.paimon.rest.RESTCatalogOptions.URI;
/** Factory for {@link DLFAuthProvider}. */
@@ -44,16 +43,14 @@ public class DLFAuthProviderFactory implements
AuthProviderFactory {
DLFTokenLoader dlfTokenLoader =
DLFTokenLoaderFactory.createDLFTokenLoader(
options.get(RESTCatalogOptions.DLF_TOKEN_LOADER),
options);
- long tokenRefreshInMills =
options.get(TOKEN_REFRESH_TIME).toMillis();
- return DLFAuthProvider.buildRefreshToken(dlfTokenLoader,
tokenRefreshInMills, region);
+ return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region);
} else if
(options.getOptional(RESTCatalogOptions.DLF_TOKEN_PATH).isPresent()) {
DLFTokenLoader dlfTokenLoader =
DLFTokenLoaderFactory.createDLFTokenLoader("local_file",
options);
- long tokenRefreshInMills =
options.get(TOKEN_REFRESH_TIME).toMillis();
- return DLFAuthProvider.buildRefreshToken(dlfTokenLoader,
tokenRefreshInMills, region);
+ return DLFAuthProvider.fromTokenLoader(dlfTokenLoader, region);
} else if
(options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_ID).isPresent()
&&
options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET).isPresent()) {
- return DLFAuthProvider.buildAKToken(
+ return DLFAuthProvider.fromAccessKey(
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
options.get(RESTCatalogOptions.DLF_SECURITY_TOKEN),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
index c73f32c990..3806b045d3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
@@ -53,7 +53,7 @@ public class DLFECSTokenLoader implements DLFTokenLoader {
.readTimeout(Duration.ofMinutes(3))
.build();
- private String ecsMetadataURL;
+ private final String ecsMetadataURL;
private String roleName;
@@ -70,6 +70,11 @@ public class DLFECSTokenLoader implements DLFTokenLoader {
return getToken(ecsMetadataURL + roleName);
}
+ @Override
+ public String description() {
+ return ecsMetadataURL;
+ }
+
private static String getRole(String url) {
try {
return getResponseBody(url);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
index c1aea84280..22d0f4090d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFLocalFileTokenLoader.java
@@ -21,17 +21,12 @@ package org.apache.paimon.rest.auth;
import org.apache.paimon.utils.FileIOUtils;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
/** DLF Token Loader for local file. */
public class DLFLocalFileTokenLoader implements DLFTokenLoader {
- private static final long[] READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS =
{1_000, 3_000, 5_000};
-
private final String tokenFilePath;
public DLFLocalFileTokenLoader(String tokenFilePath) {
@@ -40,25 +35,32 @@ public class DLFLocalFileTokenLoader implements
DLFTokenLoader {
@Override
public DLFToken loadToken() {
- return readToken(tokenFilePath, 0);
+ return readToken(tokenFilePath);
}
- protected static DLFToken readToken(String tokenFilePath, int retryTimes) {
- try {
- File tokenFile = new File(tokenFilePath);
- if (tokenFile.exists()) {
- String tokenStr = FileIOUtils.readFileUtf8(tokenFile);
+ @Override
+ public String description() {
+ return tokenFilePath;
+ }
+
+ protected static DLFToken readToken(String tokenFilePath) {
+ int retry = 1;
+ Exception lastException = null;
+ while (retry <= 5) {
+ try {
+ String tokenStr = FileIOUtils.readFileUtf8(new
File(tokenFilePath));
return OBJECT_MAPPER.readValue(tokenStr, DLFToken.class);
- } else if (retryTimes <
READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS.length - 1) {
-
Thread.sleep(READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS[retryTimes]);
- return readToken(tokenFilePath, retryTimes + 1);
- } else {
- throw new FileNotFoundException(tokenFilePath);
+ } catch (Exception e) {
+ lastException = e;
+ }
+ try {
+ Thread.sleep(retry * 1000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ retry++;
}
+ throw new RuntimeException(lastException);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
index c961dfe38b..37590cad8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
@@ -19,15 +19,25 @@
package org.apache.paimon.rest.auth;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.Objects;
/** <b>Ali CLoud</b> DLF Token. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class DLFToken {
+ public static final DateTimeFormatter TOKEN_DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
private static final String ACCESS_KEY_ID_FIELD_NAME = "AccessKeyId";
private static final String ACCESS_KEY_SECRET_FIELD_NAME =
"AccessKeySecret";
private static final String SECURITY_TOKEN_FIELD_NAME = "SecurityToken";
@@ -42,19 +52,30 @@ public class DLFToken {
@JsonProperty(SECURITY_TOKEN_FIELD_NAME)
private final String securityToken;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(EXPIRATION_FIELD_NAME)
+ @Nullable
private final String expiration;
+ @JsonIgnore @Nullable private final Long expirationAtMills;
+
@JsonCreator
public DLFToken(
@JsonProperty(ACCESS_KEY_ID_FIELD_NAME) String accessKeyId,
@JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME) String accessKeySecret,
@JsonProperty(SECURITY_TOKEN_FIELD_NAME) String securityToken,
- @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) {
+ @Nullable @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.securityToken = securityToken;
this.expiration = expiration;
+ if (expiration == null) {
+ this.expirationAtMills = null;
+ } else {
+ LocalDateTime dateTime = LocalDateTime.parse(expiration,
TOKEN_DATE_FORMATTER);
+ // Note: the date time is UTC time zone
+ this.expirationAtMills =
dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ }
}
public String getAccessKeyId() {
@@ -69,8 +90,9 @@ public class DLFToken {
return securityToken;
}
- public String getExpiration() {
- return expiration;
+ @Nullable
+ public Long getExpirationAtMills() {
+ return expirationAtMills;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
index fb2199e320..0b45284223 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFTokenLoader.java
@@ -22,4 +22,6 @@ package org.apache.paimon.rest.auth;
public interface DLFTokenLoader {
DLFToken loadToken();
+
+ String description();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
index b76701845f..254eca75ca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
@@ -25,18 +25,15 @@ import java.util.function.Function;
public class RESTAuthFunction implements Function<RESTAuthParameter,
Map<String, String>> {
private final Map<String, String> initHeader;
- private final AuthSession authSession;
+ private final AuthProvider authProvider;
- public RESTAuthFunction(Map<String, String> initHeader, AuthSession
authSession) {
+ public RESTAuthFunction(Map<String, String> initHeader, AuthProvider
authProvider) {
this.initHeader = initHeader;
- this.authSession = authSession;
+ this.authProvider = authProvider;
}
@Override
public Map<String, String> apply(RESTAuthParameter restAuthParameter) {
- if (authSession != null) {
- return authSession.getAuthProvider().header(initHeader,
restAuthParameter);
- }
- return initHeader;
+ return authProvider.mergeAuthHeader(initHeader, restAuthParameter);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index b1f68bd71e..dfdfe5f9b9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -19,7 +19,6 @@
package org.apache.paimon.rest;
import org.apache.paimon.rest.auth.AuthProvider;
-import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.auth.RESTAuthFunction;
import org.apache.paimon.rest.auth.RESTAuthParameter;
@@ -85,9 +84,8 @@ public class HttpClientTest {
httpClient = new HttpClient(server.getBaseUrl());
httpClient.setErrorHandler(errorHandler);
AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
- AuthSession authSession = new AuthSession(authProvider);
headers = new HashMap<>();
- restAuthFunction = new RESTAuthFunction(headers, authSession);
+ restAuthFunction = new RESTAuthFunction(headers, authProvider);
}
@After
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index dab6b0fb34..6b8e83ff52 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -99,7 +99,7 @@ class MockRESTCatalogTest extends RESTCatalogTest {
String akSecret = "akSecret" + UUID.randomUUID();
String securityToken = "securityToken" + UUID.randomUUID();
String region = "cn-hangzhou";
- this.authProvider = DLFAuthProvider.buildAKToken(akId, akSecret,
securityToken, region);
+ this.authProvider = DLFAuthProvider.fromAccessKey(akId, akSecret,
securityToken, region);
this.authMap =
ImmutableMap.of(
RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.DLF.identifier(),
@@ -122,7 +122,7 @@ class MockRESTCatalogTest extends RESTCatalogTest {
new Options(
ImmutableMap.of(
RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath)));
- this.authProvider = DLFAuthProvider.buildRefreshToken(tokenLoader,
1000_000L, region);
+ this.authProvider = DLFAuthProvider.fromTokenLoader(tokenLoader,
region);
this.authMap =
ImmutableMap.of(
RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.DLF.identifier(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 2c4cc894a7..2d539fe5d1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -247,7 +247,7 @@ public class RESTCatalogServer {
resourcePath, parameters,
request.getMethod(), data);
String authToken =
authProvider
- .header(headers, restAuthParameter)
+ .mergeAuthHeader(headers,
restAuthParameter)
.get(AUTHORIZATION_HEADER_KEY);
if (!authToken.equals(token)) {
return new MockResponse().setResponseCode(401);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index cb8392dd96..fc051ecad4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -81,7 +81,7 @@ import static
org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.rest.RESTCatalog.PAGE_TOKEN;
-import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
+import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
similarity index 61%
rename from
paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
index 16285e734d..770dadab5f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.rest.auth;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.ThreadPoolUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,7 +28,6 @@ import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -37,10 +35,9 @@ import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.paimon.rest.RESTCatalog.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_REGION;
@@ -50,21 +47,16 @@ import static
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_LOADER;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
-import static
org.apache.paimon.rest.auth.AuthSession.MAX_REFRESH_WINDOW_MILLIS;
-import static org.apache.paimon.rest.auth.AuthSession.MIN_REFRESH_WAIT_MILLIS;
-import static org.apache.paimon.rest.auth.AuthSession.REFRESH_NUM_RETRIES;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER;
import static
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTHORIZATION_HEADER_KEY;
import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY;
-import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
+import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-/** Test for {@link AuthSession}. */
-public class AuthSessionTest {
+/** Test for {@link AuthProvider}. */
+public class AuthProviderTest {
@Rule public TemporaryFolder folder = new TemporaryFolder();
private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new
ObjectMapper();
@@ -77,36 +69,28 @@ public class AuthSessionTest {
initialHeaders.put("k2", "v2");
Options options = new Options();
options.set(TOKEN.key(), token);
- AuthProvider authProvider =
-
AuthProviderFactory.createAuthProvider(AuthProviderEnum.BEAR.identifier(),
options);
- AuthSession session = new AuthSession(authProvider);
- Map<String, String> headers =
session.getAuthProvider().header(initialHeaders, null);
+ options.set(TOKEN_PROVIDER.key(), "bear");
+ AuthProvider authProvider =
AuthProviderFactory.createAuthProvider(options);
+ Map<String, String> headers =
authProvider.mergeAuthHeader(initialHeaders, null);
assertEquals(
headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY),
"Bearer " + token);
}
@Test
- public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException,
InterruptedException {
+ public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException {
String fileName = UUID.randomUUID().toString();
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String theFirstGenerateToken = tokenFile2Token.getRight();
File tokenFile = tokenFile2Token.getLeft();
- long tokenRefreshInMills = 1000;
- AuthProvider authProvider =
- generateDLFAuthProvider(Optional.of(tokenRefreshInMills),
fileName, "serverUrl");
- ScheduledExecutorService executor =
- ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
- AuthSession session = AuthSession.fromRefreshAuthProvider(executor,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ DLFAuthProvider authProvider = generateDLFAuthProvider(fileName,
"serverUrl");
String theFirstFetchToken =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(theFirstFetchToken, theFirstGenerateToken);
tokenFile.delete();
tokenFile2Token = generateTokenAndWriteToFile(fileName);
String theSecondGenerateToken = tokenFile2Token.getRight();
- Thread.sleep(tokenRefreshInMills * 2);
String theSecondFetchToken =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
// if the second fetch token is not equal to the first fetch token, it
means refresh success
// as refresh maybe fail in test environment, so we need to check
whether refresh success
if (!theSecondFetchToken.equals(theFirstFetchToken)) {
@@ -115,62 +99,23 @@ public class AuthSessionTest {
}
@Test
- public void testRefreshAuthProviderIsSoonExpire() throws IOException,
InterruptedException {
+ public void testRefreshAuthProviderIsSoonExpire() throws IOException {
String fileName = UUID.randomUUID().toString();
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String token = tokenFile2Token.getRight();
File tokenFile = tokenFile2Token.getLeft();
- long tokenRefreshInMills = 5000L;
- AuthProvider authProvider =
- generateDLFAuthProvider(Optional.of(tokenRefreshInMills),
fileName, "serverUrl");
- AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
- String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ DLFAuthProvider authProvider = generateDLFAuthProvider(fileName,
"serverUrl");
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(token, authToken);
- Thread.sleep((long) (tokenRefreshInMills * (1 -
DLFAuthProvider.EXPIRED_FACTOR)) + 10L);
tokenFile.delete();
tokenFile2Token = generateTokenAndWriteToFile(fileName);
token = tokenFile2Token.getRight();
tokenFile = tokenFile2Token.getLeft();
FileUtils.writeStringToFile(tokenFile, token);
- dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider();
- authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(token, authToken);
}
- @Test
- public void testRetryWhenRefreshFail() throws Exception {
- AuthProvider authProvider = Mockito.mock(DLFAuthProvider.class);
- long expiresAtMillis = System.currentTimeMillis() - 1000L;
-
when(authProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis));
- when(authProvider.tokenRefreshInMills()).thenReturn(Optional.of(50L));
- when(authProvider.keepRefreshed()).thenReturn(true);
- when(authProvider.refresh()).thenReturn(false);
- AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
- AuthSession.scheduleTokenRefresh(
- ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"),
- session,
- expiresAtMillis);
- Thread.sleep(10_000L);
- verify(authProvider, Mockito.times(REFRESH_NUM_RETRIES + 1)).refresh();
- }
-
- @Test
- public void testGetTimeToWaitByExpiresInMills() {
- long expiresInMillis = -100L;
- long timeToWait =
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
- assertEquals(MIN_REFRESH_WAIT_MILLIS, timeToWait);
- expiresInMillis = (long) (MAX_REFRESH_WINDOW_MILLIS * 0.5);
- timeToWait =
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
- assertEquals(MIN_REFRESH_WAIT_MILLIS, timeToWait);
- expiresInMillis = MAX_REFRESH_WINDOW_MILLIS;
- timeToWait =
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
- assertEquals(timeToWait, MIN_REFRESH_WAIT_MILLIS);
- expiresInMillis = MAX_REFRESH_WINDOW_MILLIS * 2L;
- timeToWait =
AuthSession.getTimeToWaitByExpiresInMills(expiresInMillis);
- assertEquals(timeToWait, MAX_REFRESH_WINDOW_MILLIS);
- }
-
@Test
public void testCreateDLFAuthProviderByStsToken() throws IOException {
Options options = new Options();
@@ -182,11 +127,10 @@ public class AuthSessionTest {
options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
options.set(DLF_SECURITY_TOKEN.key(), token.getSecurityToken());
options.set(DLF_REGION.key(), "cn-hangzhou");
- AuthProvider authProvider =
-
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
- AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
- String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ options.set(TOKEN_PROVIDER, "dlf");
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token),
authToken);
}
@@ -199,11 +143,10 @@ public class AuthSessionTest {
options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId());
options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
options.set(DLF_REGION.key(), "cn-hangzhou");
- AuthProvider authProvider =
-
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
- AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
- String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ options.set(TOKEN_PROVIDER, "dlf");
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token),
authToken);
}
@@ -212,55 +155,43 @@ public class AuthSessionTest {
String fileName = UUID.randomUUID().toString();
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String token = tokenFile2Token.getRight();
- AuthProvider authProvider =
- generateDLFAuthProvider(Optional.empty(), fileName,
"serverUrl");
- ScheduledExecutorService executor =
- ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
- AuthSession session = AuthSession.fromRefreshAuthProvider(executor,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
- String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ DLFAuthProvider authProvider = generateDLFAuthProvider(fileName,
"serverUrl");
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(authToken, token);
}
@Test
public void testCreateDLFAuthProviderWithoutNeedConf() {
+ Options options = new Options();
+ options.set(TOKEN_PROVIDER, "dlf");
assertThrows(
IllegalArgumentException.class,
- () ->
- AuthProviderFactory.createAuthProvider(
- AuthProviderEnum.DLF.identifier(), new
Options()));
+ () -> AuthProviderFactory.createAuthProvider(new Options()));
}
@Test
- public void testCreateDlfAuthProviderByDLFTokenLoader()
- throws IOException, InterruptedException {
+ public void testCreateDlfAuthProviderByDLFTokenLoader() throws IOException
{
String fileName = UUID.randomUUID().toString();
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String theFirstGenerateToken = tokenFile2Token.getRight();
File tokenFile = tokenFile2Token.getLeft();
- long tokenRefreshInMills = 1000;
// create options with token loader
Options options = new Options();
options.set(DLF_TOKEN_LOADER.key(), "local_file");
options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" +
fileName);
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
options.set(DLF_REGION.key(), "cn-hangzhou");
- options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
- AuthProvider authProvider =
-
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
- ScheduledExecutorService executor =
- ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
- AuthSession session = AuthSession.fromRefreshAuthProvider(executor,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ options.set(TOKEN_PROVIDER, "dlf");
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
String theFirstFetchToken =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(theFirstFetchToken, theFirstGenerateToken);
tokenFile.delete();
tokenFile2Token = generateTokenAndWriteToFile(fileName);
String theSecondGenerateToken = tokenFile2Token.getRight();
- Thread.sleep(tokenRefreshInMills * 2);
String theSecondFetchToken =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
// if the second fetch token is not equal to the first fetch token, it
means refresh success
// as refresh maybe fail in test environment, so we need to check
whether refresh success
if (!theSecondFetchToken.equals(theFirstFetchToken)) {
@@ -269,8 +200,7 @@ public class AuthSessionTest {
}
@Test
- public void testCreateDlfAuthProviderByCustomDLFTokenLoader()
- throws IOException, InterruptedException {
+ public void testCreateDlfAuthProviderByCustomDLFTokenLoader() {
DLFToken customToken = generateToken();
// create options with custom token loader
Options options = new Options();
@@ -280,21 +210,17 @@ public class AuthSessionTest {
options.set(DLF_SECURITY_TOKEN.key(), customToken.getSecurityToken());
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
options.set(DLF_REGION.key(), "cn-hangzhou");
- AuthProvider authProvider =
-
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
- ScheduledExecutorService executor =
- ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
- AuthSession session = AuthSession.fromRefreshAuthProvider(executor,
authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
- DLFToken fetchToken = dlfAuthProvider.token;
+ options.set(TOKEN_PROVIDER, "dlf");
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
+ DLFToken fetchToken = authProvider.getFreshToken();
assertEquals(fetchToken.getAccessKeyId(),
customToken.getAccessKeyId());
assertEquals(fetchToken.getAccessKeySecret(),
customToken.getAccessKeySecret());
assertEquals(fetchToken.getSecurityToken(),
customToken.getSecurityToken());
}
@Test
- public void testCreateDlfAuthProviderByECSTokenProvider()
- throws IOException, InterruptedException {
+ public void testCreateDlfAuthProviderByECSTokenProvider() throws
IOException {
MockECSMetadataService mockECSMetadataService = new
MockECSMetadataService("EcsTestRole");
mockECSMetadataService.start();
try {
@@ -302,7 +228,6 @@ public class AuthSessionTest {
mockECSMetadataService.setMockToken(theFirstMockToken);
String theFirstMockTokenStr =
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
- long tokenRefreshInMills = 1000;
// create options with token loader
Options options = new Options();
options.set(DLF_TOKEN_LOADER.key(), "ecs");
@@ -311,34 +236,41 @@ public class AuthSessionTest {
mockECSMetadataService.getUrl() +
"latest/meta-data/Ram/security-credentials/");
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
options.set(DLF_REGION.key(), "cn-hangzhou");
- options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
- AuthProvider authProvider =
- AuthProviderFactory.createAuthProvider(
- AuthProviderEnum.DLF.identifier(), options);
- ScheduledExecutorService executor =
- ThreadPoolUtils.createScheduledThreadPool(1,
"refresh-token");
- AuthSession session =
AuthSession.fromRefreshAuthProvider(executor, authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ options.set(TOKEN_PROVIDER, "dlf");
+
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
+
+ // first token
String theFirstFetchTokenStr =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
- DLFToken theSecondMockToken = generateToken();
+ // second token
+ DLFToken theSecondMockToken =
+ generateToken(
+ ZonedDateTime.now(ZoneOffset.UTC)
+
.plusSeconds(TOKEN_EXPIRATION_SAFE_TIME_MILLIS * 2 / 1000));
String theSecondMockTokenStr =
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
mockECSMetadataService.setMockToken(theSecondMockToken);
- Thread.sleep(tokenRefreshInMills * 2);
String theSecondFetchTokenStr =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
+
+ // third token, should not refresh
+ DLFToken theThirdMockToken = generateToken();
+ mockECSMetadataService.setMockToken(theThirdMockToken);
+ String theThirdFetchTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
+ assertEquals(theThirdFetchTokenStr, theSecondMockTokenStr);
} finally {
mockECSMetadataService.shutdown();
}
}
@Test
- public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole()
- throws IOException, InterruptedException {
+ public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole()
throws IOException {
MockECSMetadataService mockECSMetadataService = new
MockECSMetadataService("CustomRole");
mockECSMetadataService.start();
try {
@@ -346,7 +278,6 @@ public class AuthSessionTest {
mockECSMetadataService.setMockToken(theFirstMockToken);
String theFirstMockTokenStr =
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
- long tokenRefreshInMills = 1000;
// create options with token loader
Options options = new Options();
options.set(DLF_TOKEN_LOADER.key(), "ecs");
@@ -356,25 +287,19 @@ public class AuthSessionTest {
options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
options.set(DLF_REGION.key(), "cn-hangzhou");
- options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
- AuthProvider authProvider =
- AuthProviderFactory.createAuthProvider(
- AuthProviderEnum.DLF.identifier(), options);
- ScheduledExecutorService executor =
- ThreadPoolUtils.createScheduledThreadPool(1,
"refresh-token");
- AuthSession session =
AuthSession.fromRefreshAuthProvider(executor, authProvider);
- DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ options.set(TOKEN_PROVIDER, "dlf");
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
String theFirstFetchTokenStr =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
DLFToken theSecondMockToken = generateToken();
String theSecondMockTokenStr =
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
mockECSMetadataService.setMockToken(theSecondMockToken);
- Thread.sleep(tokenRefreshInMills * 2);
String theSecondFetchTokenStr =
-
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(authProvider.getFreshToken());
assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
} finally {
mockECSMetadataService.shutdown();
@@ -382,8 +307,7 @@ public class AuthSessionTest {
}
@Test
- public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole()
- throws IOException, InterruptedException {
+ public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole()
throws IOException {
MockECSMetadataService mockECSMetadataService = new
MockECSMetadataService("EcsTestRole");
mockECSMetadataService.start();
try {
@@ -398,13 +322,10 @@ public class AuthSessionTest {
options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
options.set(DLF_REGION.key(), "cn-hangzhou");
- assertThrows(
- RuntimeException.class,
- () -> {
- AuthProvider authProvider =
- AuthProviderFactory.createAuthProvider(
- AuthProviderEnum.DLF.identifier(),
options);
- });
+ options.set(TOKEN_PROVIDER, "dlf");
+ DLFAuthProvider authProvider =
+ (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
+ assertThrows(RuntimeException.class, authProvider::getFreshToken);
} finally {
mockECSMetadataService.shutdown();
}
@@ -416,7 +337,7 @@ public class AuthSessionTest {
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String tokenStr = tokenFile2Token.getRight();
String serverUrl = "https://dlf-cn-hangzhou.aliyuncs.com";
- AuthProvider authProvider = generateDLFAuthProvider(Optional.empty(),
fileName, serverUrl);
+ AuthProvider authProvider = generateDLFAuthProvider(fileName,
serverUrl);
DLFToken token = OBJECT_MAPPER_INSTANCE.readValue(tokenStr,
DLFToken.class);
Map<String, String> parameters = new HashMap<>();
parameters.put("k1", "v1");
@@ -424,7 +345,8 @@ public class AuthSessionTest {
String data = "data";
RESTAuthParameter restAuthParameter =
new RESTAuthParameter("/path", parameters, "method", "data");
- Map<String, String> header = authProvider.header(new HashMap<>(),
restAuthParameter);
+ Map<String, String> header =
+ authProvider.mergeAuthHeader(new HashMap<>(),
restAuthParameter);
String authorization = header.get(DLF_AUTHORIZATION_HEADER_KEY);
String[] credentials = authorization.split(",")[0].split("
")[1].split("/");
String dateTime = header.get(DLF_DATE_HEADER_KEY);
@@ -456,8 +378,7 @@ public class AuthSessionTest {
private Pair<File, String> generateTokenAndWriteToFile(String fileName)
throws IOException {
File tokenFile = folder.newFile(fileName);
- ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
- String expiration = now.format(TOKEN_DATE_FORMATTER);
+ String expiration =
ZonedDateTime.now(ZoneOffset.UTC).format(TOKEN_DATE_FORMATTER);
String secret = UUID.randomUUID().toString();
DLFToken token = new DLFToken("accessKeyId", secret, "securityToken",
expiration);
String tokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(token);
@@ -466,23 +387,23 @@ public class AuthSessionTest {
}
private DLFToken generateToken() {
+ return generateToken(ZonedDateTime.now(ZoneOffset.UTC));
+ }
+
+ private DLFToken generateToken(ZonedDateTime expireTime) {
String accessKeyId = UUID.randomUUID().toString();
String accessKeySecret = UUID.randomUUID().toString();
String securityToken = UUID.randomUUID().toString();
- ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
- String expiration = now.format(TOKEN_DATE_FORMATTER);
+ String expiration = expireTime.format(TOKEN_DATE_FORMATTER);
return new DLFToken(accessKeyId, accessKeySecret, securityToken,
expiration);
}
- private AuthProvider generateDLFAuthProvider(
- Optional<Long> tokenRefreshInMillsOpt, String fileName, String
serverUrl) {
+ private DLFAuthProvider generateDLFAuthProvider(String fileName, String
serverUrl) {
Options options = new Options();
options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" +
fileName);
options.set(RESTCatalogOptions.URI.key(), serverUrl);
options.set(DLF_REGION.key(), "cn-hangzhou");
- tokenRefreshInMillsOpt.ifPresent(
- tokenRefreshInMills ->
- options.set(TOKEN_REFRESH_TIME.key(),
tokenRefreshInMills + "ms"));
- return
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
+ options.set(TOKEN_PROVIDER, "dlf");
+ return (DLFAuthProvider)
AuthProviderFactory.createAuthProvider(options);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
index 279c70080b..139df16dea 100644
---
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CustomTestDLFTokenLoader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.rest.auth;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
-import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
+import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
/** DLF Token Loader for custom. */
public class CustomTestDLFTokenLoader implements DLFTokenLoader {
@@ -39,4 +39,9 @@ public class CustomTestDLFTokenLoader implements
DLFTokenLoader {
public DLFToken loadToken() {
return token;
}
+
+ @Override
+ public String description() {
+ return "custom";
+ }
}