This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bc407713e9 [core] Support DLF auth (#4983)
bc407713e9 is described below
commit bc407713e9b60813222600982919a1285f75aeb8
Author: jerry <[email protected]>
AuthorDate: Mon Feb 24 17:54:43 2025 +0800
[core] Support DLF auth (#4983)
---
docs/content/concepts/rest-catalog.md | 18 +-
.../java/org/apache/paimon/utils/StringUtils.java | 8 +
.../java/org/apache/paimon/rest/HttpClient.java | 89 +++++++--
.../java/org/apache/paimon/rest/RESTCatalog.java | 89 ++++++---
.../org/apache/paimon/rest/RESTCatalogOptions.java | 50 +++--
.../java/org/apache/paimon/rest/RESTClient.java | 19 +-
.../org/apache/paimon/rest/auth/AuthProvider.java | 39 +---
...okenAuthProvider.java => AuthProviderEnum.java} | 38 ++--
...nAuthProvider.java => AuthProviderFactory.java} | 44 ++---
.../org/apache/paimon/rest/auth/AuthSession.java | 46 ++---
.../paimon/rest/auth/BearTokenAuthProvider.java | 17 +-
...ider.java => BearTokenAuthProviderFactory.java} | 30 +--
.../rest/auth/BearTokenFileAuthProvider.java | 101 ----------
.../apache/paimon/rest/auth/DLFAuthProvider.java | 207 ++++++++++++++++++++
.../paimon/rest/auth/DLFAuthProviderFactory.java | 65 +++++++
.../apache/paimon/rest/auth/DLFAuthSignature.java | 206 ++++++++++++++++++++
.../java/org/apache/paimon/rest/auth/DLFToken.java | 92 +++++++++
...okenAuthProvider.java => RESTAuthFunction.java} | 34 ++--
.../apache/paimon/rest/auth/RESTAuthParameter.java | 64 ++++++
.../services/org.apache.paimon.factories.Factory | 2 +
.../org/apache/paimon/rest/HttpClientTest.java | 48 ++++-
.../org/apache/paimon/rest/RESTCatalogServer.java | 10 +-
.../org/apache/paimon/rest/RESTCatalogTest.java | 22 +++
.../apache/paimon/rest/auth/AuthProviderTest.java | 82 --------
.../apache/paimon/rest/auth/AuthSessionTest.java | 214 +++++++++++++++++----
.../paimon/rest/auth/DLFAuthSignatureTest.java | 61 ++++++
.../org/apache/paimon/flink/RESTCatalogITCase.java | 2 +
27 files changed, 1224 insertions(+), 473 deletions(-)
diff --git a/docs/content/concepts/rest-catalog.md
b/docs/content/concepts/rest-catalog.md
index 4645d86418..55613f5107 100644
--- a/docs/content/concepts/rest-catalog.md
+++ b/docs/content/concepts/rest-catalog.md
@@ -70,7 +70,8 @@ WITH (
'dlf.accessKeySecret'='<accessKeySecret>',
);
```
-- DLF token path
+
+- DLF sts token
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
@@ -78,7 +79,20 @@ WITH (
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf',
-'dlf.token-path' = '<token-path>'
+'dlf.accessKeyId'='<accessKeyId>',
+'dlf.accessKeySecret'='<accessKeySecret>',
+'dlf.securityToken'='<securityToken>'
+);
+```
+
+- DLF sts token path
+```sql
+CREATE CATALOG `paimon-rest-catalog`
+WITH (
+'type' = 'paimon',
+'uri' = '<catalog server url>',
+'metastore' = 'rest',
+'token.provider' = 'dlf'
);
```
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index c4e07e0a69..5aa4e65f4c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -558,4 +558,12 @@ public class StringUtils {
}
return true;
}
+
+ // A null-safe trim method.
+ public static String trim(String value) {
+ if (value == null) {
+ return null;
+ }
+ return value.trim();
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 5a13a51ef7..8aa3da86d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -20,8 +20,11 @@ package org.apache.paimon.rest;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.responses.ErrorResponse;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -36,12 +39,17 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
@@ -82,32 +90,38 @@ public class HttpClient implements RESTClient {
@Override
public <T extends RESTResponse> T get(
- String path, Class<T> responseType, Map<String, String> headers) {
+ String path, Class<T> responseType, RESTAuthFunction
restAuthFunction) {
+ Map<String, String> authHeaders = getHeaders(path, "GET", "",
restAuthFunction);
Request request =
new Request.Builder()
.url(getRequestUrl(path))
.get()
- .headers(Headers.of(headers))
+ .headers(Headers.of(authHeaders))
.build();
return exec(request, responseType);
}
@Override
public <T extends RESTResponse> T post(
- String path, RESTRequest body, Map<String, String> headers) {
- return post(path, body, null, headers);
+ String path, RESTRequest body, RESTAuthFunction restAuthFunction) {
+ return post(path, body, null, restAuthFunction);
}
@Override
public <T extends RESTResponse> T post(
- String path, RESTRequest body, Class<T> responseType, Map<String,
String> headers) {
+ String path,
+ RESTRequest body,
+ Class<T> responseType,
+ RESTAuthFunction restAuthFunction) {
try {
- RequestBody requestBody = buildRequestBody(body);
+ String bodyStr = OBJECT_MAPPER.writeValueAsString(body);
+ Map<String, String> authHeaders = getHeaders(path, "POST",
bodyStr, restAuthFunction);
+ RequestBody requestBody = buildRequestBody(bodyStr);
Request request =
new Request.Builder()
.url(getRequestUrl(path))
.post(requestBody)
- .headers(Headers.of(headers))
+ .headers(Headers.of(authHeaders))
.build();
return exec(request, responseType);
} catch (JsonProcessingException e) {
@@ -116,26 +130,29 @@ public class HttpClient implements RESTClient {
}
@Override
- public <T extends RESTResponse> T delete(String path, Map<String, String>
headers) {
+ public <T extends RESTResponse> T delete(String path, RESTAuthFunction
restAuthFunction) {
+ Map<String, String> authHeaders = getHeaders(path, "DELETE", "",
restAuthFunction);
Request request =
new Request.Builder()
.url(getRequestUrl(path))
.delete()
- .headers(Headers.of(headers))
+ .headers(Headers.of(authHeaders))
.build();
return exec(request, null);
}
@Override
public <T extends RESTResponse> T delete(
- String path, RESTRequest body, Map<String, String> headers) {
+ String path, RESTRequest body, RESTAuthFunction restAuthFunction) {
try {
- RequestBody requestBody = buildRequestBody(body);
+ String bodyStr = OBJECT_MAPPER.writeValueAsString(body);
+ Map<String, String> authHeaders = getHeaders(path, "DELETE",
bodyStr, restAuthFunction);
+ RequestBody requestBody = buildRequestBody(bodyStr);
Request request =
new Request.Builder()
.url(getRequestUrl(path))
.delete(requestBody)
- .headers(Headers.of(headers))
+ .headers(Headers.of(authHeaders))
.build();
return exec(request, null);
} catch (JsonProcessingException e) {
@@ -182,12 +199,8 @@ public class HttpClient implements RESTClient {
}
}
- private RequestBody buildRequestBody(RESTRequest body) throws
JsonProcessingException {
- return RequestBody.create(OBJECT_MAPPER.writeValueAsBytes(body),
MEDIA_TYPE);
- }
-
- private String getRequestUrl(String path) {
- return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
+ private static RequestBody buildRequestBody(String body) throws
JsonProcessingException {
+ return RequestBody.create(body.getBytes(StandardCharsets.UTF_8),
MEDIA_TYPE);
}
private static OkHttpClient createHttpClient(HttpClientOptions
httpClientOptions) {
@@ -221,4 +234,44 @@ public class HttpClient implements RESTClient {
return builder.build();
}
+
+ private String getRequestUrl(String path) {
+ return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
+ }
+
+ private Map<String, String> getHeaders(
+ String path,
+ String method,
+ String data,
+ Function<RESTAuthParameter, Map<String, String>> headerFunction) {
+ Pair<String, Map<String, String>> resourcePath2Parameters =
parsePath(path);
+ RESTAuthParameter restAuthParameter =
+ new RESTAuthParameter(
+ URI.create(uri).getHost(),
+ resourcePath2Parameters.getLeft(),
+ resourcePath2Parameters.getValue(),
+ method,
+ data);
+ return headerFunction.apply(restAuthParameter);
+ }
+
+ @VisibleForTesting
+ protected static Pair<String, Map<String, String>> parsePath(String path) {
+ String[] paths = path.split("\\?");
+ String resourcePath = paths[0];
+ if (paths.length == 1) {
+ return Pair.of(resourcePath, Collections.emptyMap());
+ }
+ String query = paths[1];
+ Map<String, String> parameters =
+ Arrays.stream(query.split("&"))
+ .map(pair -> pair.split("=", 2))
+ .collect(
+ Collectors.toMap(
+ pair -> pair[0].trim(), // key
+ pair -> pair[1].trim(), // value
+ (existing, replacement) -> existing //
handle duplicates
+ ));
+ return Pair.of(resourcePath, parameters);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 6469859e80..d7c5990aa6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -32,6 +32,8 @@ import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.AuthSession;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
@@ -108,6 +110,7 @@ public class RESTCatalog implements Catalog {
private final CatalogContext context;
private final boolean dataTokenEnabled;
private final FileIO fileIO;
+ private RESTAuthFunction restAuthFunction;
private volatile ScheduledExecutorService refreshExecutor = null;
@@ -118,23 +121,25 @@ public class RESTCatalog implements Catalog {
public RESTCatalog(CatalogContext context, boolean configRequired) {
this.client = new HttpClient(context.options());
this.catalogAuth = createAuthSession(context.options(),
tokenRefreshExecutor());
-
Options options = context.options();
+ Map<String, String> baseHeaders = Collections.emptyMap();
if (configRequired) {
if (context.options().contains(WAREHOUSE)) {
throw new IllegalArgumentException("Can not config warehouse
in RESTCatalog.");
}
- Map<String, String> initHeaders =
- RESTUtil.merge(
- extractPrefixMap(context.options(), HEADER_PREFIX),
- catalogAuth.getHeaders());
+ baseHeaders = extractPrefixMap(context.options(), HEADER_PREFIX);
options =
new Options(
- client.get(ResourcePaths.V1_CONFIG,
ConfigResponse.class, initHeaders)
+ client.get(
+ ResourcePaths.V1_CONFIG,
+ ConfigResponse.class,
+ new RESTAuthFunction(
+ Collections.emptyMap(),
catalogAuth))
.merge(context.options().toMap()));
+ baseHeaders.putAll(extractPrefixMap(options, HEADER_PREFIX));
}
-
+ this.restAuthFunction = new RESTAuthFunction(baseHeaders, catalogAuth);
context = CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
this.context = context;
this.resourcePaths = ResourcePaths.forCatalogProperties(options);
@@ -156,7 +161,8 @@ public class RESTCatalog implements Catalog {
@Override
public List<String> listDatabases() {
ListDatabasesResponse response =
- client.get(resourcePaths.databases(),
ListDatabasesResponse.class, headers());
+ client.get(
+ resourcePaths.databases(),
ListDatabasesResponse.class, restAuthFunction);
if (response.getDatabases() != null) {
return response.getDatabases();
}
@@ -170,7 +176,10 @@ public class RESTCatalog implements Catalog {
CreateDatabaseRequest request = new CreateDatabaseRequest(name,
properties);
try {
client.post(
- resourcePaths.databases(), request,
CreateDatabaseResponse.class, headers());
+ resourcePaths.databases(),
+ request,
+ CreateDatabaseResponse.class,
+ restAuthFunction);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(name);
@@ -187,7 +196,10 @@ public class RESTCatalog implements Catalog {
}
try {
GetDatabaseResponse response =
- client.get(resourcePaths.database(name),
GetDatabaseResponse.class, headers());
+ client.get(
+ resourcePaths.database(name),
+ GetDatabaseResponse.class,
+ restAuthFunction);
return new Database.DatabaseImpl(
name, response.options(), response.comment().orElseGet(()
-> null));
} catch (NoSuchResourceException e) {
@@ -205,7 +217,7 @@ public class RESTCatalog implements Catalog {
if (!cascade && !this.listTables(name).isEmpty()) {
throw new DatabaseNotEmptyException(name);
}
- client.delete(resourcePaths.database(name), headers());
+ client.delete(resourcePaths.database(name), restAuthFunction);
} catch (NoSuchResourceException | DatabaseNotExistException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(name);
@@ -231,7 +243,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.databaseProperties(name),
request,
AlterDatabaseResponse.class,
- headers());
+ restAuthFunction);
if (response.getUpdated().isEmpty()) {
throw new IllegalStateException("Failed to update properties");
}
@@ -251,7 +263,7 @@ public class RESTCatalog implements Catalog {
client.get(
resourcePaths.tables(databaseName),
ListTablesResponse.class,
- headers());
+ restAuthFunction);
if (response.getTables() != null) {
return response.getTables();
}
@@ -290,7 +302,7 @@ public class RESTCatalog implements Catalog {
return client.get(
resourcePaths.tableToken(identifier.getDatabaseName(),
identifier.getObjectName()),
GetTableTokenResponse.class,
- catalogAuth.getHeaders());
+ restAuthFunction);
}
public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
@@ -300,7 +312,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.commitTable(identifier.getDatabaseName()),
request,
CommitTableResponse.class,
- headers());
+ restAuthFunction);
return response.isSuccess();
}
@@ -312,7 +324,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.table(
identifier.getDatabaseName(),
identifier.getTableName()),
GetTableResponse.class,
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
@@ -331,7 +343,9 @@ public class RESTCatalog implements Catalog {
checkNotSystemTable(identifier, "createTable");
validateAutoCreateClose(schema.options());
CreateTableRequest request = new CreateTableRequest(identifier,
schema);
- client.post(resourcePaths.tables(identifier.getDatabaseName()),
request, headers());
+ client.post(
+ resourcePaths.tables(identifier.getDatabaseName()),
request, restAuthFunction);
+
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(identifier);
@@ -356,7 +370,10 @@ public class RESTCatalog implements Catalog {
checkNotSystemTable(toTable, "renameTable");
try {
RenameTableRequest request = new RenameTableRequest(fromTable,
toTable);
-
client.post(resourcePaths.renameTable(fromTable.getDatabaseName()), request,
headers());
+ client.post(
+ resourcePaths.renameTable(fromTable.getDatabaseName()),
+ request,
+ restAuthFunction);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(fromTable);
@@ -378,7 +395,7 @@ public class RESTCatalog implements Catalog {
client.post(
resourcePaths.table(identifier.getDatabaseName(),
identifier.getTableName()),
request,
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
if (e.resourceType() == ErrorResponseResourceType.TABLE) {
@@ -408,7 +425,7 @@ public class RESTCatalog implements Catalog {
try {
client.delete(
resourcePaths.table(identifier.getDatabaseName(),
identifier.getTableName()),
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(identifier);
@@ -427,7 +444,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.partitions(
identifier.getDatabaseName(),
identifier.getTableName()),
request,
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
@@ -444,7 +461,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.dropPartitions(
identifier.getDatabaseName(),
identifier.getTableName()),
request,
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
@@ -470,7 +487,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.alterPartitions(
identifier.getDatabaseName(),
identifier.getTableName()),
request,
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
@@ -487,7 +504,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.markDonePartitions(
identifier.getDatabaseName(),
identifier.getTableName()),
request,
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
@@ -503,7 +520,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.partitions(
identifier.getDatabaseName(),
identifier.getTableName()),
ListPartitionsResponse.class,
- headers());
+ restAuthFunction);
if (response == null || response.getPartitions() == null) {
return Collections.emptyList();
}
@@ -526,7 +543,7 @@ public class RESTCatalog implements Catalog {
resourcePaths.view(
identifier.getDatabaseName(),
identifier.getTableName()),
GetViewResponse.class,
- headers());
+ restAuthFunction);
ViewSchema schema = response.getSchema();
return new ViewImpl(
identifier,
@@ -546,7 +563,7 @@ public class RESTCatalog implements Catalog {
try {
client.delete(
resourcePaths.view(identifier.getDatabaseName(),
identifier.getTableName()),
- headers());
+ restAuthFunction);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new ViewNotExistException(identifier);
@@ -566,7 +583,9 @@ public class RESTCatalog implements Catalog {
view.comment().orElse(null),
view.options());
CreateViewRequest request = new CreateViewRequest(identifier,
schema);
- client.post(resourcePaths.views(identifier.getDatabaseName()),
request, headers());
+ client.post(
+ resourcePaths.views(identifier.getDatabaseName()),
request, restAuthFunction);
+
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
} catch (AlreadyExistsException e) {
@@ -581,7 +600,9 @@ public class RESTCatalog implements Catalog {
try {
ListViewsResponse response =
client.get(
- resourcePaths.views(databaseName),
ListViewsResponse.class, headers());
+ resourcePaths.views(databaseName),
+ ListViewsResponse.class,
+ restAuthFunction);
return response.getViews();
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
@@ -593,7 +614,11 @@ public class RESTCatalog implements Catalog {
throws ViewNotExistException, ViewAlreadyExistException {
try {
RenameTableRequest request = new RenameTableRequest(fromView,
toView);
- client.post(resourcePaths.renameView(fromView.getDatabaseName()),
request, headers());
+ client.post(
+ resourcePaths.renameView(fromView.getDatabaseName()),
+ request,
+ restAuthFunction);
+
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new ViewNotExistException(fromView);
@@ -618,8 +643,8 @@ public class RESTCatalog implements Catalog {
}
}
- private Map<String, String> headers() {
- return catalogAuth.getHeaders();
+ Map<String, String> headers(RESTAuthParameter restAuthParameter) {
+ return restAuthFunction.apply(restAuthParameter);
}
private ScheduledExecutorService tokenRefreshExecutor() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 35b72469bd..310e3335ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -60,23 +60,49 @@ public class RESTCatalogOptions {
ConfigOptions.key("token")
.stringType()
.noDefaultValue()
- .withDescription("REST Catalog auth token.");
+ .withDescription("REST Catalog auth bear token.");
- public static final ConfigOption<Duration> TOKEN_EXPIRATION_TIME =
- ConfigOptions.key("token.expiration-time")
+ public static final ConfigOption<Duration> TOKEN_REFRESH_TIME =
+ ConfigOptions.key("token.refresh-time")
.durationType()
.defaultValue(Duration.ofHours(1))
- .withDescription(
- "REST Catalog auth token expires time.The token
generates system refresh frequency is t1,"
- + " the token expires time is t2, we need
to guarantee that t2 > t1,"
- + " the token validity time is [t2 - t1,
t2],"
- + " and the expires time defined here
needs to be less than (t2 - t1)");
-
- public static final ConfigOption<String> TOKEN_PROVIDER_PATH =
- ConfigOptions.key("token.provider.path")
+ .withDescription("REST Catalog auth token refresh time.");
+
+ public static final ConfigOption<String> TOKEN_PROVIDER =
+ ConfigOptions.key("token.provider")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("REST Catalog auth token provider.");
+
+ public static final ConfigOption<String> DLF_TOKEN_PATH =
+ ConfigOptions.key("dlf.token-path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("REST Catalog auth DLF token file path.");
+
+ public static final ConfigOption<String> DLF_ACCESS_KEY_ID =
+ ConfigOptions.key("dlf.accessKeyId")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("REST Catalog auth DLF access key id");
+
+ public static final ConfigOption<String> DLF_ACCESS_KEY_SECRET =
+ ConfigOptions.key("dlf.accessKeySecret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("REST Catalog auth DLF access key
secret");
+
+ public static final ConfigOption<String> DLF_SECURITY_TOKEN =
+ ConfigOptions.key("dlf.securityToken")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("REST Catalog auth DLF security token");
+
+ public static final ConfigOption<String> DLF_ROLE_SESSION_NAME =
+ ConfigOptions.key("dlf.roleSessionName")
.stringType()
.noDefaultValue()
- .withDescription("REST Catalog auth token provider path.");
+ .withDescription("REST Catalog auth DLF role session
name");
public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
ConfigOptions.key("data-token.enabled")
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
index d816f09ed0..7446836653 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
@@ -18,20 +18,27 @@
package org.apache.paimon.rest;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
+
import java.io.Closeable;
-import java.util.Map;
/** Interface for a basic HTTP Client for interfacing with the REST catalog. */
public interface RESTClient extends Closeable {
- <T extends RESTResponse> T get(String path, Class<T> responseType,
Map<String, String> headers);
+ <T extends RESTResponse> T get(
+ String path, Class<T> responseType, RESTAuthFunction
restAuthFunction);
- <T extends RESTResponse> T post(String path, RESTRequest body, Map<String,
String> headers);
+ <T extends RESTResponse> T post(
+ String path, RESTRequest body, RESTAuthFunction restAuthFunction);
<T extends RESTResponse> T post(
- String path, RESTRequest body, Class<T> responseType, Map<String,
String> headers);
+ String path,
+ RESTRequest body,
+ Class<T> responseType,
+ RESTAuthFunction restAuthFunction);
- <T extends RESTResponse> T delete(String path, Map<String, String>
headers);
+ <T extends RESTResponse> T delete(String path, RESTAuthFunction
restAuthFunction);
- <T extends RESTResponse> T delete(String path, RESTRequest body,
Map<String, String> headers);
+ <T extends RESTResponse> T delete(
+ String path, RESTRequest body, RESTAuthFunction restAuthFunction);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
index 26c9490dc9..666337ec79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProvider.java
@@ -18,27 +18,16 @@
package org.apache.paimon.rest.auth;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTCatalogOptions;
-import org.apache.paimon.utils.StringUtils;
-
import java.util.Map;
import java.util.Optional;
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_EXPIRATION_TIME;
-import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER_PATH;
-
/** Authentication provider. */
public interface AuthProvider {
- Map<String, String> authHeader();
+ Map<String, String> header(Map<String, String> baseHeader,
RESTAuthParameter restAuthParameter);
boolean refresh();
- default boolean supportRefresh() {
- return false;
- }
-
default boolean keepRefreshed() {
return false;
}
@@ -51,31 +40,7 @@ public interface AuthProvider {
return Optional.empty();
}
- default Optional<Long> expiresInMills() {
+ default Optional<Long> tokenRefreshInMills() {
return Optional.empty();
}
-
- static AuthProvider create(Options options) {
- if
(options.getOptional(RESTCatalogOptions.TOKEN_PROVIDER_PATH).isPresent()) {
- if (!options.getOptional(TOKEN_PROVIDER_PATH).isPresent()) {
- throw new IllegalArgumentException(TOKEN_PROVIDER_PATH.key() +
" is required");
- }
- String tokenFilePath = options.get(TOKEN_PROVIDER_PATH);
- if (options.getOptional(TOKEN_EXPIRATION_TIME).isPresent()) {
- long tokenExpireInMills =
options.get(TOKEN_EXPIRATION_TIME).toMillis();
- return new BearTokenFileAuthProvider(tokenFilePath,
tokenExpireInMills);
-
- } else {
- return new BearTokenFileAuthProvider(tokenFilePath);
- }
- } else {
- if (options.getOptional(RESTCatalogOptions.TOKEN)
- .map(StringUtils::isNullOrWhitespaceOnly)
- .orElse(true)) {
- throw new IllegalArgumentException(
- RESTCatalogOptions.TOKEN.key() + " is required and not
empty");
- }
- return new
BearTokenAuthProvider(options.get(RESTCatalogOptions.TOKEN));
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderEnum.java
similarity index 55%
copy from
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to
paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderEnum.java
index 73e5081a5e..c84a38a485 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderEnum.java
@@ -18,33 +18,27 @@
package org.apache.paimon.rest.auth;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+/** AuthProviderEnum. */
+public enum AuthProviderEnum {
+ BEAR("bear"),
+ DLF("dlf");
-import java.util.Map;
+ private final String identifier;
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
- private static final String AUTHORIZATION_HEADER = "Authorization";
- private static final String BEARER_PREFIX = "Bearer ";
-
- protected String token;
-
- public BearTokenAuthProvider(String token) {
- this.token = token;
- }
-
- public String token() {
- return token;
+ AuthProviderEnum(String identifier) {
+ this.identifier = identifier;
}
- @Override
- public Map<String, String> authHeader() {
- return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+ public String identifier() {
+ return identifier;
}
- @Override
- public boolean refresh() {
- return true;
+ public static AuthProviderEnum fromString(String identifier) {
+ for (AuthProviderEnum authProviderEnum : AuthProviderEnum.values()) {
+ if (authProviderEnum.identifier.equals(identifier)) {
+ return authProviderEnum;
+ }
+ }
+ throw new IllegalArgumentException("Unknown AuthProvider type: " +
identifier);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
similarity index 54%
copy from
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to
paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
index 73e5081a5e..1cccfeb626 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthProviderFactory.java
@@ -18,33 +18,21 @@
package org.apache.paimon.rest.auth;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
- private static final String AUTHORIZATION_HEADER = "Authorization";
- private static final String BEARER_PREFIX = "Bearer ";
-
- protected String token;
-
- public BearTokenAuthProvider(String token) {
- this.token = token;
- }
-
- public String token() {
- return token;
- }
-
- @Override
- public Map<String, String> authHeader() {
- return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
- }
-
- @Override
- public boolean refresh() {
- return true;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.options.Options;
+
+/** Factory for {@link AuthProvider}. */
+public interface AuthProviderFactory extends Factory {
+
+ AuthProvider create(Options options);
+
+ static AuthProvider createAuthProvider(String name, Options options) {
+ AuthProviderFactory factory =
+ FactoryUtil.discoverFactory(
+ AuthProviderFactory.class.getClassLoader(),
+ AuthProviderFactory.class,
+ name);
+ return factory.create(options);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
index 5f77002c0d..1ce391e00d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
@@ -20,19 +20,16 @@ package org.apache.paimon.rest.auth;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTUtil;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.paimon.rest.RESTCatalog.HEADER_PREFIX;
-import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
-
/** Authentication session. */
public class AuthSession {
@@ -43,18 +40,14 @@ public class AuthSession {
private static final Logger LOG =
LoggerFactory.getLogger(AuthSession.class);
private final AuthProvider authProvider;
- private volatile Map<String, String> headers;
- public AuthSession(Map<String, String> headers, AuthProvider authProvider)
{
+ public AuthSession(AuthProvider authProvider) {
this.authProvider = authProvider;
- this.headers = RESTUtil.merge(headers, this.authProvider.authHeader());
}
public static AuthSession fromRefreshAuthProvider(
- ScheduledExecutorService executor,
- Map<String, String> headers,
- AuthProvider authProvider) {
- AuthSession session = new AuthSession(headers, authProvider);
+ ScheduledExecutorService executor, AuthProvider authProvider) {
+ AuthSession session = new AuthSession(authProvider);
long startTimeMillis = System.currentTimeMillis();
Optional<Long> expiresAtMillisOpt = authProvider.expiresAtMillis();
@@ -75,23 +68,17 @@ public class AuthSession {
return session;
}
- public Map<String, String> getHeaders() {
+ public AuthProvider getAuthProvider() {
if (this.authProvider.keepRefreshed() &&
this.authProvider.willSoonExpire()) {
refresh();
}
- return headers;
+ return this.authProvider;
}
public Boolean refresh() {
- if (this.authProvider.supportRefresh()
- && this.authProvider.keepRefreshed()
- && this.authProvider.expiresInMills().isPresent()) {
- boolean isSuccessful = this.authProvider.refresh();
- if (isSuccessful) {
- Map<String, String> currentHeaders = this.headers;
- this.headers = RESTUtil.merge(currentHeaders,
this.authProvider.authHeader());
- }
- return isSuccessful;
+ if (this.authProvider.keepRefreshed()
+ && this.authProvider.tokenRefreshInMills().isPresent()) {
+ return this.authProvider.refresh();
}
return false;
@@ -142,7 +129,7 @@ public class AuthSession {
scheduleTokenRefresh(
executor,
session,
- refreshStartTime +
session.authProvider.expiresInMills().get(),
+ refreshStartTime +
session.authProvider.tokenRefreshInMills().get(),
0);
} else {
scheduleTokenRefresh(executor, session, expiresAtMillis,
retryTimes + 1);
@@ -151,12 +138,15 @@ public class AuthSession {
public static AuthSession createAuthSession(
Options options, ScheduledExecutorService refreshExecutor) {
- Map<String, String> baseHeader = extractPrefixMap(options,
HEADER_PREFIX);
- AuthProvider authProvider = AuthProvider.create(options);
+ String tokenProvider = options.get(RESTCatalogOptions.TOKEN_PROVIDER);
+ if (StringUtils.isEmpty(tokenProvider)) {
+ throw new IllegalArgumentException("token.provider is not set.");
+ }
+ AuthProvider authProvider =
AuthProviderFactory.createAuthProvider(tokenProvider, options);
if (authProvider.keepRefreshed()) {
- return AuthSession.fromRefreshAuthProvider(refreshExecutor,
baseHeader, authProvider);
+ return AuthSession.fromRefreshAuthProvider(refreshExecutor,
authProvider);
} else {
- return new AuthSession(baseHeader, authProvider);
+ return new AuthSession(authProvider);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
index 73e5081a5e..a501b98556 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
@@ -18,14 +18,14 @@
package org.apache.paimon.rest.auth;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-
+import java.util.HashMap;
import java.util.Map;
/** Auth provider for bear token. */
public class BearTokenAuthProvider implements AuthProvider {
- private static final String AUTHORIZATION_HEADER = "Authorization";
+ public static final String AUTHORIZATION_HEADER_KEY = "Authorization";
+
private static final String BEARER_PREFIX = "Bearer ";
protected String token;
@@ -34,13 +34,12 @@ public class BearTokenAuthProvider implements AuthProvider {
this.token = token;
}
- public String token() {
- return token;
- }
-
@Override
- public Map<String, String> authHeader() {
- return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+ public Map<String, String> header(
+ Map<String, String> baseHeader, RESTAuthParameter
restAuthParameter) {
+ Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
+ headersWithAuth.put(AUTHORIZATION_HEADER_KEY, BEARER_PREFIX + token);
+ return headersWithAuth;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProviderFactory.java
similarity index 56%
copy from
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProviderFactory.java
index 73e5081a5e..a32c526bb0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProviderFactory.java
@@ -18,33 +18,19 @@
package org.apache.paimon.rest.auth;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
-import java.util.Map;
-
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
- private static final String AUTHORIZATION_HEADER = "Authorization";
- private static final String BEARER_PREFIX = "Bearer ";
-
- protected String token;
-
- public BearTokenAuthProvider(String token) {
- this.token = token;
- }
-
- public String token() {
- return token;
- }
+/** Factory for {@link BearTokenAuthProvider}. */
+public class BearTokenAuthProviderFactory implements AuthProviderFactory {
@Override
- public Map<String, String> authHeader() {
- return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+ public String identifier() {
+ return AuthProviderEnum.BEAR.identifier();
}
@Override
- public boolean refresh() {
- return true;
+ public AuthProvider create(Options options) {
+ return new
BearTokenAuthProvider(options.get(RESTCatalogOptions.TOKEN));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileAuthProvider.java
deleted file mode 100644
index a1ecb0b26c..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileAuthProvider.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest.auth;
-
-import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.StringUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Optional;
-
-/** Auth provider for get bear token from file. */
-public class BearTokenFileAuthProvider extends BearTokenAuthProvider {
-
- public static final double EXPIRED_FACTOR = 0.4;
-
- private final String tokenFilePath;
-
- private boolean keepRefreshed = false;
- private Long expiresAtMillis = null;
- private Long expiresInMills = null;
-
- public BearTokenFileAuthProvider(String tokenFilePath) {
- super(readToken(tokenFilePath));
- this.tokenFilePath = tokenFilePath;
- }
-
- public BearTokenFileAuthProvider(String tokenFilePath, Long
expiresInMills) {
- this(tokenFilePath);
- this.keepRefreshed = true;
- this.expiresAtMillis = -1L;
- this.expiresInMills = expiresInMills;
- }
-
- @Override
- public boolean refresh() {
- long start = System.currentTimeMillis();
- String newToken = readToken(tokenFilePath);
- if (StringUtils.isNullOrWhitespaceOnly(newToken)) {
- return false;
- }
- this.expiresAtMillis = start + this.expiresInMills;
- this.token = newToken;
- return true;
- }
-
- @Override
- public boolean supportRefresh() {
- return true;
- }
-
- @Override
- public boolean keepRefreshed() {
- return this.keepRefreshed;
- }
-
- @Override
- public boolean willSoonExpire() {
- if (keepRefreshed()) {
- return expiresAtMillis().get() - System.currentTimeMillis()
- < expiresInMills().get() * EXPIRED_FACTOR;
- } else {
- return false;
- }
- }
-
- @Override
- public Optional<Long> expiresAtMillis() {
- return Optional.ofNullable(this.expiresAtMillis);
- }
-
- @Override
- public Optional<Long> expiresInMills() {
- return Optional.ofNullable(this.expiresInMills);
- }
-
- private static String readToken(String filePath) {
- try {
- return FileIOUtils.readFileUtf8(new File(filePath));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
new file mode 100644
index 0000000000..7b70cb3545
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.utils.FileIOUtils;
+
+import okhttp3.MediaType;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
+
+/** Auth provider for <b>Ali CLoud</b> DLF. */
+public class DLFAuthProvider implements AuthProvider {
+
+ public static final String DLF_HOST_HEADER_KEY = "Host";
+ public static final String DLF_AUTHORIZATION_HEADER_KEY = "Authorization";
+ public static final String DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5";
+ public static final String DLF_CONTENT_TYPE_KEY = "Content-Type";
+ public static final String DLF_DATE_HEADER_KEY = "x-dlf-date";
+ public static final String DLF_SECURITY_TOKEN_HEADER_KEY =
"x-dlf-security-token";
+ public static final String DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version";
+ public static final String DLF_CONTENT_SHA56_HEADER_KEY =
"x-dlf-content-sha256";
+ public static final String DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD";
+ public static final double EXPIRED_FACTOR = 0.4;
+ public static final DateTimeFormatter TOKEN_DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'");
+ public static final DateTimeFormatter AUTH_DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyyMMdd");
+ protected static final MediaType MEDIA_TYPE =
MediaType.parse("application/json");
+ private static final long[] READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS =
{1_000, 3_000, 5_000};
+
+ private final String tokenFilePath;
+
+ protected DLFToken token;
+ private final boolean keepRefreshed;
+ private Long expiresAtMillis;
+ private final Long tokenRefreshInMills;
+ private final String region;
+
+ public static DLFAuthProvider buildRefreshToken(
+ String tokenFilePath, Long tokenRefreshInMills, String region) {
+ DLFToken token = readToken(tokenFilePath, 0);
+ Long expiresAtMillis = getExpirationInMills(token.getExpiration());
+ return new DLFAuthProvider(
+ tokenFilePath, token, true, expiresAtMillis,
tokenRefreshInMills, region);
+ }
+
+ public static DLFAuthProvider buildAKToken(
+ String accessKeyId, String accessKeySecret, String securityToken,
String region) {
+ DLFToken token = new DLFToken(accessKeyId, accessKeySecret,
securityToken, null);
+ return new DLFAuthProvider(null, token, false, null, null, region);
+ }
+
+ public DLFAuthProvider(
+ String tokenFilePath,
+ DLFToken token,
+ boolean keepRefreshed,
+ Long expiresAtMillis,
+ Long tokenRefreshInMills,
+ String region) {
+ this.tokenFilePath = tokenFilePath;
+ this.token = token;
+ this.keepRefreshed = keepRefreshed;
+ this.expiresAtMillis = expiresAtMillis;
+ this.tokenRefreshInMills = tokenRefreshInMills;
+ this.region = region;
+ }
+
+ @Override
+ public Map<String, String> header(
+ Map<String, String> baseHeader, RESTAuthParameter
restAuthParameter) {
+ try {
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ String date = now.format(AUTH_DATE_FORMATTER);
+ String dateTime = now.format(AUTH_DATE_TIME_FORMATTER);
+ Map<String, String> signHeaders =
+ generateSignHeaders(
+ restAuthParameter.host(),
+ restAuthParameter.data(),
+ dateTime,
+ token.getSecurityToken());
+ String authorization =
+ DLFAuthSignature.getAuthorization(
+ restAuthParameter, token, region, signHeaders,
dateTime, date);
+ Map<String, String> headersWithAuth = new HashMap<>(baseHeader);
+ headersWithAuth.putAll(signHeaders);
+ headersWithAuth.put(DLF_AUTHORIZATION_HEADER_KEY, authorization);
+ return headersWithAuth;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Map<String, String> generateSignHeaders(
+ String host, String data, String dateTime, String securityToken)
throws Exception {
+ Map<String, String> signHeaders = new HashMap<>();
+ signHeaders.put(DLF_DATE_HEADER_KEY, dateTime);
+ signHeaders.put(DLF_HOST_HEADER_KEY, host);
+ signHeaders.put(DLF_CONTENT_SHA56_HEADER_KEY, DLF_CONTENT_SHA56_VALUE);
+ signHeaders.put(DLF_AUTH_VERSION_HEADER_KEY, DLFAuthSignature.VERSION);
+ if (data != null && !data.isEmpty()) {
+ signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE.toString());
+ signHeaders.put(DLF_CONTENT_MD5_HEADER_KEY,
DLFAuthSignature.md5(data));
+ }
+ if (securityToken != null) {
+ signHeaders.put(DLF_SECURITY_TOKEN_HEADER_KEY, securityToken);
+ }
+ return signHeaders;
+ }
+
+ @Override
+ public boolean refresh() {
+ long start = System.currentTimeMillis();
+ DLFToken newToken = readToken(tokenFilePath, 0);
+ if (newToken == null) {
+ return false;
+ }
+ this.expiresAtMillis = start + this.tokenRefreshInMills;
+ this.token = newToken;
+ return true;
+ }
+
+ @Override
+ public boolean keepRefreshed() {
+ return this.keepRefreshed;
+ }
+
+ @Override
+ public boolean willSoonExpire() {
+ if (keepRefreshed()) {
+ return expiresAtMillis().get() - System.currentTimeMillis()
+ < tokenRefreshInMills().get() * EXPIRED_FACTOR;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Optional<Long> expiresAtMillis() {
+ return Optional.ofNullable(this.expiresAtMillis);
+ }
+
+ @Override
+ public Optional<Long> tokenRefreshInMills() {
+ return Optional.ofNullable(this.tokenRefreshInMills);
+ }
+
+ protected static DLFToken readToken(String tokenFilePath, int retryTimes) {
+ try {
+ File tokenFile = new File(tokenFilePath);
+ if (tokenFile.exists()) {
+ String tokenStr = FileIOUtils.readFileUtf8(tokenFile);
+ return OBJECT_MAPPER.readValue(tokenStr, DLFToken.class);
+ } else if (retryTimes <
READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS.length - 1) {
+
Thread.sleep(READ_TOKEN_FILE_BACKOFF_WAIT_TIME_MILLIS[retryTimes]);
+ return readToken(tokenFilePath, retryTimes + 1);
+ } else {
+ throw new FileNotFoundException(tokenFilePath);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Long getExpirationInMills(String dateStr) {
+ try {
+ if (dateStr == null) {
+ return null;
+ }
+ LocalDateTime dateTime = LocalDateTime.parse(dateStr,
TOKEN_DATE_FORMATTER);
+ return dateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
new file mode 100644
index 0000000000..a78c44dea2
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
+import static org.apache.paimon.rest.RESTCatalogOptions.URI;
+
+/** Factory for {@link DLFAuthProvider}. */
+public class DLFAuthProviderFactory implements AuthProviderFactory {
+
+ @Override
+ public String identifier() {
+ return AuthProviderEnum.DLF.identifier();
+ }
+
+ @Override
+ public AuthProvider create(Options options) {
+ String region = getRegion(options);
+ if
(options.getOptional(RESTCatalogOptions.DLF_TOKEN_PATH).isPresent()) {
+ String tokenFilePath = options.get(DLF_TOKEN_PATH);
+ long tokenRefreshInMills =
options.get(TOKEN_REFRESH_TIME).toMillis();
+ return DLFAuthProvider.buildRefreshToken(tokenFilePath,
tokenRefreshInMills, region);
+ } else if
(options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_ID).isPresent()
+ &&
options.getOptional(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET).isPresent()) {
+ return DLFAuthProvider.buildAKToken(
+ options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
+ options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
+ options.get(RESTCatalogOptions.DLF_SECURITY_TOKEN),
+ region);
+ }
+ throw new IllegalArgumentException("DLF token path or AK must be set
for DLF Auth.");
+ }
+
+ private static String getRegion(Options options) {
+ String region = "undefined";
+ try {
+ String[] paths = options.get(URI).split("\\.");
+ if (paths.length > 1) {
+ region = paths[1];
+ }
+ } catch (Exception ignore) {
+ }
+ return region;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthSignature.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthSignature.java
new file mode 100644
index 0000000000..b138d38c6c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthSignature.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY;
+import static
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY;
+import static
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_CONTENT_TYPE_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_HOST_HEADER_KEY;
+import static
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_SECURITY_TOKEN_HEADER_KEY;
+
+/** generate authorization for <b>Ali CLoud</b> DLF. */
+public class DLFAuthSignature {
+
+ public static final String VERSION = "v1";
+
+ private static final String SIGNATURE_ALGORITHM = "DLF4-HMAC-SHA256";
+ private static final String PRODUCT = "DlfNext";
+ private static final String HMAC_SHA256 = "HmacSHA256";
+ private static final String REQUEST_TYPE = "aliyun_v4_request";
+ private static final String ADDITIONAL_HEADERS_KEY = "AdditionalHeaders";
+ private static final String SIGNATURE_KEY = "Signature";
+ private static final String NEW_LINE = "\n";
+ private static final List<String> SIGNED_HEADERS =
+ Arrays.asList(
+ DLF_CONTENT_MD5_HEADER_KEY.toLowerCase(),
+ DLF_CONTENT_TYPE_KEY.toLowerCase(),
+ DLF_CONTENT_SHA56_HEADER_KEY.toLowerCase(),
+ DLF_DATE_HEADER_KEY.toLowerCase(),
+ DLF_AUTH_VERSION_HEADER_KEY.toLowerCase(),
+ DLF_SECURITY_TOKEN_HEADER_KEY.toLowerCase());
+ // must be ordered by alphabetical
+ private static final List<String> ADDITIONAL_HEADERS =
+ Arrays.asList(DLF_HOST_HEADER_KEY.toLowerCase());
+
+ public static String getAuthorization(
+ RESTAuthParameter restAuthParameter,
+ DLFToken dlfToken,
+ String region,
+ Map<String, String> headers,
+ String dateTime,
+ String date)
+ throws Exception {
+ String canonicalRequest = getCanonicalRequest(restAuthParameter,
headers);
+ String stringToSign =
+ Joiner.on(NEW_LINE)
+ .join(
+ SIGNATURE_ALGORITHM,
+ dateTime,
+ String.format("%s/%s/%s/%s", date, region,
PRODUCT, REQUEST_TYPE),
+ sha256Hex(canonicalRequest));
+ byte[] dateKey = hmacSha256(("aliyun_v4" +
dlfToken.getAccessKeySecret()).getBytes(), date);
+ byte[] dateRegionKey = hmacSha256(dateKey, region);
+ byte[] dateRegionServiceKey = hmacSha256(dateRegionKey, PRODUCT);
+ byte[] signingKey = hmacSha256(dateRegionServiceKey, REQUEST_TYPE);
+ byte[] result = hmacSha256(signingKey, stringToSign);
+ String signature = hexEncode(result);
+ String authorization =
+ Joiner.on(",")
+ .join(
+ String.format(
+ "%s Credential=%s/%s/%s/%s/%s",
+ SIGNATURE_ALGORITHM,
+ dlfToken.getAccessKeyId(),
+ date,
+ region,
+ PRODUCT,
+ REQUEST_TYPE),
+ String.format(
+ "%s=%s",
+ ADDITIONAL_HEADERS_KEY,
+
Joiner.on(",").join(ADDITIONAL_HEADERS)),
+ String.format("%s=%s", SIGNATURE_KEY,
signature));
+ return authorization;
+ }
+
+ public static String md5(String raw) throws Exception {
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+ messageDigest.update(raw.getBytes(UTF_8.name()));
+ byte[] md5 = messageDigest.digest();
+ return new String(Base64.getEncoder().encodeToString(md5));
+ }
+
+ private static byte[] hmacSha256(byte[] key, String data) {
+ try {
+ SecretKeySpec secretKeySpec = new SecretKeySpec(key, HMAC_SHA256);
+ Mac mac = Mac.getInstance(HMAC_SHA256);
+ mac.init(secretKeySpec);
+ byte[] hmacBytes = mac.doFinal(data.getBytes());
+ return hmacBytes;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to calculate HMAC-SHA256", e);
+ }
+ }
+
+ public static String getCanonicalRequest(
+ RESTAuthParameter restAuthParameter, Map<String, String> headers)
throws Exception {
+ String canonicalRequest =
+ Joiner.on(NEW_LINE)
+ .join(restAuthParameter.method(),
restAuthParameter.resourcePath());
+ // Canonical Query String + "\n" +
+ TreeMap<String, String> orderMap = new TreeMap<>();
+ if (restAuthParameter.parameters() != null) {
+ orderMap.putAll(restAuthParameter.parameters());
+ }
+ String separator = "";
+ StringBuilder canonicalPart = new StringBuilder();
+ for (Map.Entry<String, String> param : orderMap.entrySet()) {
+
canonicalPart.append(separator).append(StringUtils.trim(param.getKey()));
+ if (param.getValue() != null && !param.getValue().isEmpty()) {
+
canonicalPart.append("=").append((StringUtils.trim(param.getValue())));
+ }
+ separator = "&";
+ }
+ canonicalRequest = Joiner.on(NEW_LINE).join(canonicalRequest,
canonicalPart);
+
+ // Canonical Headers + "\n" +
+ TreeMap<String, String> sortedSignedHeadersMap =
buildSortedSignedHeadersMap(headers);
+ for (Map.Entry<String, String> header :
sortedSignedHeadersMap.entrySet()) {
+ canonicalRequest =
+ Joiner.on(NEW_LINE)
+ .join(
+ canonicalRequest,
+ String.format("%s:%s", header.getKey(),
header.getValue()));
+ }
+
+ // Additional Headers + "\n" +
+ String additionalSignedHeaders =
Joiner.on(";").join(ADDITIONAL_HEADERS);
+ canonicalRequest = Joiner.on(NEW_LINE).join(canonicalRequest,
additionalSignedHeaders);
+ String contentSha56 =
+ headers.getOrDefault(
+ DLF_CONTENT_SHA56_HEADER_KEY,
DLFAuthProvider.DLF_CONTENT_SHA56_VALUE);
+ return Joiner.on(NEW_LINE).join(canonicalRequest, contentSha56);
+ }
+
+ private static TreeMap<String, String> buildSortedSignedHeadersMap(
+ Map<String, String> headers) {
+ TreeMap<String, String> orderMap = new TreeMap<>();
+ if (headers != null) {
+ for (Map.Entry<String, String> header : headers.entrySet()) {
+ String key = header.getKey().toLowerCase();
+ if (SIGNED_HEADERS.contains(key) ||
ADDITIONAL_HEADERS.contains(key)) {
+ orderMap.put(key, StringUtils.trim(header.getValue()));
+ }
+ }
+ }
+ return orderMap;
+ }
+
+ private static String sha256Hex(String raw) throws Exception {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ byte[] hash = digest.digest(raw.getBytes(UTF_8.name()));
+ return hexEncode(hash);
+ }
+
+ private static String hexEncode(byte[] raw) {
+ if (raw == null) {
+ return null;
+ } else {
+ StringBuilder sb = new StringBuilder();
+
+ for (byte b : raw) {
+ String hex = Integer.toHexString(b & 255);
+ if (hex.length() < 2) {
+ sb.append(0);
+ }
+
+ sb.append(hex);
+ }
+
+ return sb.toString();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
new file mode 100644
index 0000000000..c961dfe38b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFToken.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/** <b>Ali CLoud</b> DLF Token. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DLFToken {
+
+ private static final String ACCESS_KEY_ID_FIELD_NAME = "AccessKeyId";
+ private static final String ACCESS_KEY_SECRET_FIELD_NAME =
"AccessKeySecret";
+ private static final String SECURITY_TOKEN_FIELD_NAME = "SecurityToken";
+ private static final String EXPIRATION_FIELD_NAME = "Expiration";
+
+ @JsonProperty(ACCESS_KEY_ID_FIELD_NAME)
+ private final String accessKeyId;
+
+ @JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME)
+ private final String accessKeySecret;
+
+ @JsonProperty(SECURITY_TOKEN_FIELD_NAME)
+ private final String securityToken;
+
+ @JsonProperty(EXPIRATION_FIELD_NAME)
+ private final String expiration;
+
+ @JsonCreator
+ public DLFToken(
+ @JsonProperty(ACCESS_KEY_ID_FIELD_NAME) String accessKeyId,
+ @JsonProperty(ACCESS_KEY_SECRET_FIELD_NAME) String accessKeySecret,
+ @JsonProperty(SECURITY_TOKEN_FIELD_NAME) String securityToken,
+ @JsonProperty(EXPIRATION_FIELD_NAME) String expiration) {
+ this.accessKeyId = accessKeyId;
+ this.accessKeySecret = accessKeySecret;
+ this.securityToken = securityToken;
+ this.expiration = expiration;
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getAccessKeySecret() {
+ return accessKeySecret;
+ }
+
+ public String getSecurityToken() {
+ return securityToken;
+ }
+
+ public String getExpiration() {
+ return expiration;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DLFToken that = (DLFToken) o;
+ return Objects.equals(accessKeyId, that.accessKeyId)
+ && Objects.equals(accessKeySecret, that.accessKeySecret)
+ && Objects.equals(securityToken, that.securityToken)
+ && Objects.equals(expiration, that.expiration);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(accessKeyId, accessKeySecret, securityToken,
expiration);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
similarity index 56%
copy from
paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
copy to
paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
index 73e5081a5e..b76701845f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenAuthProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthFunction.java
@@ -18,33 +18,25 @@
package org.apache.paimon.rest.auth;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-
import java.util.Map;
+import java.util.function.Function;
-/** Auth provider for bear token. */
-public class BearTokenAuthProvider implements AuthProvider {
-
- private static final String AUTHORIZATION_HEADER = "Authorization";
- private static final String BEARER_PREFIX = "Bearer ";
+/** The function used to generate auth header for the rest request. */
+public class RESTAuthFunction implements Function<RESTAuthParameter,
Map<String, String>> {
- protected String token;
-
- public BearTokenAuthProvider(String token) {
- this.token = token;
- }
+ private final Map<String, String> initHeader;
+ private final AuthSession authSession;
- public String token() {
- return token;
- }
-
- @Override
- public Map<String, String> authHeader() {
- return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+ public RESTAuthFunction(Map<String, String> initHeader, AuthSession
authSession) {
+ this.initHeader = initHeader;
+ this.authSession = authSession;
}
@Override
- public boolean refresh() {
- return true;
+ public Map<String, String> apply(RESTAuthParameter restAuthParameter) {
+ if (authSession != null) {
+ return authSession.getAuthProvider().header(initHeader,
restAuthParameter);
+ }
+ return initHeader;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthParameter.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthParameter.java
new file mode 100644
index 0000000000..f0bbf50495
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/RESTAuthParameter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import java.util.Map;
+
+/** RestAuthParameter for building rest auth header. */
+public class RESTAuthParameter {
+
+ private final String host;
+ private final String resourcePath;
+ private final Map<String, String> parameters;
+ private final String method;
+ private final String data;
+
+ public RESTAuthParameter(
+ String host,
+ String resourcePath,
+ Map<String, String> parameters,
+ String method,
+ String data) {
+ this.host = host;
+ this.resourcePath = resourcePath;
+ this.parameters = parameters;
+ this.method = method;
+ this.data = data;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public String resourcePath() {
+ return resourcePath;
+ }
+
+ public Map<String, String> parameters() {
+ return parameters;
+ }
+
+ public String method() {
+ return method;
+ }
+
+ public String data() {
+ return data;
+ }
+}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 7f221e5172..81fc475d76 100644
---
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -38,3 +38,5 @@
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory
org.apache.paimon.rest.RESTCatalogFactory
org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
+org.apache.paimon.rest.auth.BearTokenAuthProviderFactory
+org.apache.paimon.rest.auth.DLFAuthProviderFactory
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index deaf6da818..8600637c88 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -19,10 +19,15 @@
package org.apache.paimon.rest;
import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.auth.RESTAuthFunction;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Before;
@@ -30,6 +35,8 @@ import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -49,6 +56,7 @@ public class HttpClientTest {
private String mockResponseDataStr;
private String errorResponseStr;
private Map<String, String> headers;
+ private RESTAuthFunction restAuthFunction;
@Before
public void setUp() throws Exception {
@@ -65,7 +73,9 @@ public class HttpClientTest {
httpClient = new HttpClient(httpClientOptions);
httpClient.setErrorHandler(errorHandler);
AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
- headers = authProvider.authHeader();
+ AuthSession authSession = new AuthSession(authProvider);
+ headers = new HashMap<>();
+ restAuthFunction = new RESTAuthFunction(headers, authSession);
}
@After
@@ -76,7 +86,7 @@ public class HttpClientTest {
@Test
public void testGetSuccess() {
server.enqueueResponse(mockResponseDataStr, 200);
- MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class,
headers);
+ MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class,
restAuthFunction);
assertEquals(mockResponseData.data(), response.data());
}
@@ -85,14 +95,14 @@ public class HttpClientTest {
server.enqueueResponse(errorResponseStr, 400);
assertThrows(
BadRequestException.class,
- () -> httpClient.get(MOCK_PATH, MockRESTData.class, headers));
+ () -> httpClient.get(MOCK_PATH, MockRESTData.class,
restAuthFunction));
}
@Test
public void testPostSuccess() {
server.enqueueResponse(mockResponseDataStr, 200);
MockRESTData response =
- httpClient.post(MOCK_PATH, mockResponseData,
MockRESTData.class, headers);
+ httpClient.post(MOCK_PATH, mockResponseData,
MockRESTData.class, restAuthFunction);
assertEquals(mockResponseData.data(), response.data());
}
@@ -101,19 +111,25 @@ public class HttpClientTest {
server.enqueueResponse(errorResponseStr, 400);
assertThrows(
BadRequestException.class,
- () -> httpClient.post(MOCK_PATH, mockResponseData,
ErrorResponse.class, headers));
+ () ->
+ httpClient.post(
+ MOCK_PATH,
+ mockResponseData,
+ ErrorResponse.class,
+ restAuthFunction));
}
@Test
public void testDeleteSuccess() {
server.enqueueResponse(mockResponseDataStr, 200);
- assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH, headers));
+ assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH,
restAuthFunction));
}
@Test
public void testDeleteFail() {
server.enqueueResponse(errorResponseStr, 400);
- assertThrows(BadRequestException.class, () ->
httpClient.delete(MOCK_PATH, headers));
+ assertThrows(
+ BadRequestException.class, () -> httpClient.delete(MOCK_PATH,
restAuthFunction));
}
@Test
@@ -124,6 +140,22 @@ public class HttpClientTest {
server.getBaseUrl(), Duration.ofSeconds(30),
1, 10, 2));
server.enqueueResponse(mockResponseDataStr, 429);
server.enqueueResponse(mockResponseDataStr, 200);
- assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class,
headers));
+ assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class,
restAuthFunction));
+ }
+
+ @Test
+ public void testParsePath() {
+ assertEquals(
+ Pair.of("/api/v1/tables", Collections.emptyMap()),
+ HttpClient.parsePath("/api/v1/tables"));
+ assertEquals(
+ Pair.of("/api/v1/tables/my_table$schemas",
Collections.emptyMap()),
+ HttpClient.parsePath("/api/v1/tables/my_table$schemas"));
+ assertEquals(
+ Pair.of("/api/v1/tables", ImmutableMap.of("pageSize", "10",
"pageNum", "1")),
+ HttpClient.parsePath("/api/v1/tables?pageSize=10&pageNum=1"));
+ assertEquals(
+ Pair.of("/api/v1/tables", ImmutableMap.of("tableName",
"t1,t2")),
+ HttpClient.parsePath("/api/v1/tables?tableName=t1,t2"));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index f29bc3eda2..e9b5b7f0a0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -28,6 +28,7 @@ import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CommitTableRequest;
@@ -112,7 +113,8 @@ public class RESTCatalogServer {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) {
- String token = request.getHeaders().get("Authorization");
+ String token =
+
request.getHeaders().get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY);
RESTResponse response;
try {
if (!("Bearer " + authToken).equals(token)) {
@@ -591,10 +593,12 @@ public class RESTCatalogServer {
private static String getConfigBody(String warehouseStr) {
return String.format(
- "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
+ "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\", \"%s\":
\"%s\"}}",
RESTCatalogInternalOptions.PREFIX.key(),
PREFIX,
CatalogOptions.WAREHOUSE.key(),
- warehouseStr);
+ warehouseStr,
+ "header.test-header",
+ "test-value");
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 1fb4dcbcd7..64389378bf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -24,6 +24,9 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -39,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,6 +65,7 @@ class RESTCatalogTest extends CatalogTestBase {
Options options = new Options();
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, initToken);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
this.catalog = new RESTCatalog(CatalogContext.create(options));
}
@@ -74,6 +79,7 @@ class RESTCatalogTest extends CatalogTestBase {
void testInitFailWhenDefineWarehouse() {
Options options = new Options();
options.set(CatalogOptions.WAREHOUSE, warehouse);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
assertThatThrownBy(() -> new
RESTCatalog(CatalogContext.create(options)))
.isInstanceOf(IllegalArgumentException.class);
}
@@ -83,12 +89,27 @@ class RESTCatalogTest extends CatalogTestBase {
Options options = new Options();
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, "aaaaa");
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER);
assertThatThrownBy(() -> new
RESTCatalog(CatalogContext.create(options)))
.isInstanceOf(NotAuthorizedException.class);
}
+ @Test
+ void testHeader() {
+ RESTCatalog restCatalog = (RESTCatalog) catalog;
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("k1", "v1");
+ parameters.put("k2", "v2");
+ RESTAuthParameter restAuthParameter =
+ new RESTAuthParameter("host", "/path", parameters, "method",
"data");
+ Map<String, String> headers = restCatalog.headers(restAuthParameter);
+ assertEquals(
+ headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY),
"Bearer init_token");
+ assertEquals(headers.get("test-header"), "test-value");
+ }
+
@Test
void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception {
Identifier identifier = Identifier.create("test_db", "test_table");
@@ -115,6 +136,7 @@ class RESTCatalogTest extends CatalogTestBase {
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
this.catalog = new RESTCatalog(CatalogContext.create(options));
List<Identifier> identifiers =
Lists.newArrayList(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
deleted file mode 100644
index 5fdb1a9c40..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest.auth;
-
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.RESTCatalogOptions;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-
-/** Test for {@link AuthProvider}. */
-public class AuthProviderTest {
-
- @Rule public TemporaryFolder folder = new TemporaryFolder();
-
- @Test
- public void testCreateBearTokenSuccess() {
- Options options = new Options();
- String token = UUID.randomUUID().toString();
- options.set(RESTCatalogOptions.TOKEN, token);
- BearTokenAuthProvider authProvider = (BearTokenAuthProvider)
AuthProvider.create(options);
- assertEquals(token, authProvider.token());
- }
-
- @Test
- public void testCreateBearTokenFail() {
- Options options = new Options();
- assertThrows(IllegalArgumentException.class, () ->
AuthProvider.create(options));
- }
-
- @Test
- public void testCreateBearTokenFileSuccess() throws Exception {
- Options options = new Options();
- String fileName = "token";
- File tokenFile = folder.newFile(fileName);
- String token = UUID.randomUUID().toString();
- FileUtils.writeStringToFile(tokenFile, token);
- options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH,
tokenFile.getPath());
- BearTokenFileAuthProvider authProvider =
- (BearTokenFileAuthProvider) AuthProvider.create(options);
- assertEquals(token, authProvider.token());
- }
-
- @Test
- public void testCreateRefreshBearTokenFileSuccess() throws Exception {
- Options options = new Options();
- String fileName = "token";
- File tokenFile = folder.newFile(fileName);
- String token = UUID.randomUUID().toString();
- FileUtils.writeStringToFile(tokenFile, token);
- options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH,
tokenFile.getPath());
- options.set(RESTCatalogOptions.TOKEN_EXPIRATION_TIME,
Duration.ofSeconds(10L));
- BearTokenFileAuthProvider authProvider =
- (BearTokenFileAuthProvider) AuthProvider.create(options);
- assertEquals(token, authProvider.token());
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
index b54c3103ed..e20ff5cb1d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
@@ -18,9 +18,13 @@
package org.apache.paimon.rest.auth;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ThreadPoolUtils;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
@@ -29,16 +33,29 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_SECURITY_TOKEN;
+import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
+import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_REFRESH_TIME;
import static
org.apache.paimon.rest.auth.AuthSession.MAX_REFRESH_WINDOW_MILLIS;
import static org.apache.paimon.rest.auth.AuthSession.MIN_REFRESH_WAIT_MILLIS;
import static org.apache.paimon.rest.auth.AuthSession.REFRESH_NUM_RETRIES;
+import static
org.apache.paimon.rest.auth.DLFAuthProvider.DLF_AUTHORIZATION_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.DLF_DATE_HEADER_KEY;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -46,6 +63,7 @@ import static org.mockito.Mockito.when;
public class AuthSessionTest {
@Rule public TemporaryFolder folder = new TemporaryFolder();
+ private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new
ObjectMapper();
@Test
public void testBearToken() {
@@ -53,78 +71,72 @@ public class AuthSessionTest {
Map<String, String> initialHeaders = new HashMap<>();
initialHeaders.put("k1", "v1");
initialHeaders.put("k2", "v2");
- AuthProvider authProvider = new BearTokenAuthProvider(token);
- AuthSession session = new AuthSession(initialHeaders, authProvider);
- Map<String, String> header = session.getHeaders();
- assertEquals(header.get("Authorization"), "Bearer " + token);
- assertEquals(header.get("k1"), "v1");
- for (Map.Entry<String, String> entry : initialHeaders.entrySet()) {
- assertEquals(entry.getValue(), header.get(entry.getKey()));
- }
- assertEquals(header.size(), initialHeaders.size() + 1);
+ Options options = new Options();
+ options.set(TOKEN.key(), token);
+ AuthProvider authProvider =
+
AuthProviderFactory.createAuthProvider(AuthProviderEnum.BEAR.identifier(),
options);
+ AuthSession session = new AuthSession(authProvider);
+ Map<String, String> headers =
session.getAuthProvider().header(initialHeaders, null);
+ assertEquals(
+ headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY),
"Bearer " + token);
}
@Test
- public void testRefreshBearTokenFileAuthProvider() throws IOException,
InterruptedException {
- String fileName = "token";
+ public void testRefreshDLFAuthTokenFileAuthProvider() throws IOException,
InterruptedException {
+ String fileName = UUID.randomUUID().toString();
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String token = tokenFile2Token.getRight();
File tokenFile = tokenFile2Token.getLeft();
- Map<String, String> initialHeaders = new HashMap<>();
- long expiresInMillis = 1000L;
+ long tokenRefreshInMills = 1000;
AuthProvider authProvider =
- new BearTokenFileAuthProvider(tokenFile.getPath(),
expiresInMillis);
+ generateDLFAuthProvider(Optional.of(tokenRefreshInMills),
fileName, "serverUrl");
ScheduledExecutorService executor =
ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
- AuthSession session =
- AuthSession.fromRefreshAuthProvider(executor, initialHeaders,
authProvider);
- Map<String, String> header = session.getHeaders();
- assertEquals(header.get("Authorization"), "Bearer " + token);
+ AuthSession session = AuthSession.fromRefreshAuthProvider(executor,
authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(authToken, token);
tokenFile.delete();
tokenFile2Token = generateTokenAndWriteToFile(fileName);
token = tokenFile2Token.getRight();
- Thread.sleep(expiresInMillis + 500L);
- header = session.getHeaders();
- assertEquals(header.get("Authorization"), "Bearer " + token);
+ Thread.sleep(tokenRefreshInMills * 2);
+ authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(authToken, token);
}
@Test
public void testRefreshAuthProviderIsSoonExpire() throws IOException,
InterruptedException {
- String fileName = "token";
+ String fileName = UUID.randomUUID().toString();
Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
String token = tokenFile2Token.getRight();
File tokenFile = tokenFile2Token.getLeft();
- Map<String, String> initialHeaders = new HashMap<>();
- long expiresInMillis = 1000L;
+ long tokenRefreshInMills = 5000L;
AuthProvider authProvider =
- new BearTokenFileAuthProvider(tokenFile.getPath(),
expiresInMillis);
- AuthSession session =
- AuthSession.fromRefreshAuthProvider(null, initialHeaders,
authProvider);
- Map<String, String> header = session.getHeaders();
- assertEquals(header.get("Authorization"), "Bearer " + token);
+ generateDLFAuthProvider(Optional.of(tokenRefreshInMills),
fileName, "serverUrl");
+ AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(token, authToken);
+ Thread.sleep((long) (tokenRefreshInMills * (1 -
DLFAuthProvider.EXPIRED_FACTOR)) + 10L);
tokenFile.delete();
tokenFile2Token = generateTokenAndWriteToFile(fileName);
token = tokenFile2Token.getRight();
tokenFile = tokenFile2Token.getLeft();
FileUtils.writeStringToFile(tokenFile, token);
- Thread.sleep(
- (long) (expiresInMillis * (1 -
BearTokenFileAuthProvider.EXPIRED_FACTOR)) + 10L);
- header = session.getHeaders();
- assertEquals(header.get("Authorization"), "Bearer " + token);
+ dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider();
+ authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(token, authToken);
}
@Test
public void testRetryWhenRefreshFail() throws Exception {
- Map<String, String> initialHeaders = new HashMap<>();
- AuthProvider authProvider =
Mockito.mock(BearTokenFileAuthProvider.class);
+ AuthProvider authProvider = Mockito.mock(DLFAuthProvider.class);
long expiresAtMillis = System.currentTimeMillis() - 1000L;
when(authProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis));
- when(authProvider.expiresInMills()).thenReturn(Optional.of(50L));
- when(authProvider.supportRefresh()).thenReturn(true);
+ when(authProvider.tokenRefreshInMills()).thenReturn(Optional.of(50L));
when(authProvider.keepRefreshed()).thenReturn(true);
when(authProvider.refresh()).thenReturn(false);
- AuthSession session =
- AuthSession.fromRefreshAuthProvider(null, initialHeaders,
authProvider);
+ AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
AuthSession.scheduleTokenRefresh(
ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"),
session,
@@ -149,10 +161,128 @@ public class AuthSessionTest {
assertEquals(timeToWait, MAX_REFRESH_WINDOW_MILLIS);
}
+ @Test
+ public void testCreateDLFAuthProviderByStsToken() throws IOException {
+ Options options = new Options();
+ String akId = UUID.randomUUID().toString();
+ String akSecret = UUID.randomUUID().toString();
+ String securityToken = UUID.randomUUID().toString();
+ DLFToken token = new DLFToken(akId, akSecret, securityToken, null);
+ options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId());
+ options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
+ options.set(DLF_SECURITY_TOKEN.key(), token.getSecurityToken());
+ AuthProvider authProvider =
+
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
+ AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token),
authToken);
+ }
+
+ @Test
+ public void testCreateDLFAuthProviderByAk() throws IOException {
+ Options options = new Options();
+ String akId = UUID.randomUUID().toString();
+ String akSecret = UUID.randomUUID().toString();
+ DLFToken token = new DLFToken(akId, akSecret, null, null);
+ options.set(DLF_ACCESS_KEY_ID.key(), token.getAccessKeyId());
+ options.set(DLF_ACCESS_KEY_SECRET.key(), token.getAccessKeySecret());
+ AuthProvider authProvider =
+
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
+ AuthSession session = AuthSession.fromRefreshAuthProvider(null,
authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(OBJECT_MAPPER_INSTANCE.writeValueAsString(token),
authToken);
+ }
+
+ @Test
+ public void testCreateDlfAuthProviderByFileNoDefineRefresh() throws
IOException {
+ String fileName = UUID.randomUUID().toString();
+ Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
+ String token = tokenFile2Token.getRight();
+ AuthProvider authProvider =
+ generateDLFAuthProvider(Optional.empty(), fileName,
"serverUrl");
+ ScheduledExecutorService executor =
+ ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
+ AuthSession session = AuthSession.fromRefreshAuthProvider(executor,
authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String authToken =
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(authToken, token);
+ }
+
+ @Test
+ public void testCreateDLFAuthProviderWithoutNeedConf() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ AuthProviderFactory.createAuthProvider(
+ AuthProviderEnum.DLF.identifier(), new
Options()));
+ }
+
+ @Test
+ public void testDLFAuthProviderAuthHeaderWhenDataIsNotEmpty() throws
Exception {
+ String fileName = UUID.randomUUID().toString();
+ Pair<File, String> tokenFile2Token =
generateTokenAndWriteToFile(fileName);
+ String tokenStr = tokenFile2Token.getRight();
+ String serverUrl = "https://dlf.cn-hangzhou.aliyuncs.com";
+ AuthProvider authProvider = generateDLFAuthProvider(Optional.empty(),
fileName, serverUrl);
+ DLFToken token = OBJECT_MAPPER_INSTANCE.readValue(tokenStr,
DLFToken.class);
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("k1", "v1");
+ parameters.put("k2", "v2");
+ String data = "data";
+ RESTAuthParameter restAuthParameter =
+ new RESTAuthParameter(serverUrl, "/path", parameters,
"method", "data");
+ Map<String, String> header = authProvider.header(new HashMap<>(),
restAuthParameter);
+ String authorization = header.get(DLF_AUTHORIZATION_HEADER_KEY);
+ String[] credentials = authorization.split(",")[0].split("
")[1].split("/");
+ String dateTime = header.get(DLF_DATE_HEADER_KEY);
+ String date = credentials[1];
+ String newAuthorization =
+ DLFAuthSignature.getAuthorization(
+ new RESTAuthParameter(serverUrl, "/path", parameters,
"method", "data"),
+ token,
+ "cn-hangzhou",
+ header,
+ dateTime,
+ date);
+ assertEquals(newAuthorization, authorization);
+ assertEquals(restAuthParameter.host(),
header.get(DLFAuthProvider.DLF_HOST_HEADER_KEY));
+ assertEquals(
+ token.getSecurityToken(),
+ header.get(DLFAuthProvider.DLF_SECURITY_TOKEN_HEADER_KEY));
+ assertTrue(header.containsKey(DLF_DATE_HEADER_KEY));
+ assertEquals(
+ DLFAuthSignature.VERSION,
header.get(DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY));
+ assertEquals(
+ DLFAuthProvider.MEDIA_TYPE.toString(),
+ header.get(DLFAuthProvider.DLF_CONTENT_TYPE_KEY));
+ assertEquals(
+ DLFAuthSignature.md5(data),
header.get(DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY));
+ assertEquals(
+ DLFAuthProvider.DLF_CONTENT_SHA56_VALUE,
+ header.get(DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY));
+ }
+
private Pair<File, String> generateTokenAndWriteToFile(String fileName)
throws IOException {
File tokenFile = folder.newFile(fileName);
- String token = UUID.randomUUID().toString();
- FileUtils.writeStringToFile(tokenFile, token);
- return Pair.of(tokenFile, token);
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ String expiration = now.format(TOKEN_DATE_FORMATTER);
+ String secret = UUID.randomUUID().toString();
+ DLFToken token = new DLFToken("accessKeyId", secret, "securityToken",
expiration);
+ String tokenStr = OBJECT_MAPPER_INSTANCE.writeValueAsString(token);
+ FileUtils.writeStringToFile(tokenFile, tokenStr);
+ return Pair.of(tokenFile, tokenStr);
+ }
+
+ private AuthProvider generateDLFAuthProvider(
+ Optional<Long> tokenRefreshInMillsOpt, String fileName, String
serverUrl) {
+ Options options = new Options();
+ options.set(DLF_TOKEN_PATH.key(), folder.getRoot().getPath() + "/" +
fileName);
+ options.set(RESTCatalogOptions.URI.key(), serverUrl);
+ tokenRefreshInMillsOpt.ifPresent(
+ tokenRefreshInMills ->
+ options.set(TOKEN_REFRESH_TIME.key(),
tokenRefreshInMills + "ms"));
+ return
AuthProviderFactory.createAuthProvider(AuthProviderEnum.DLF.identifier(),
options);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthSignatureTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthSignatureTest.java
new file mode 100644
index 0000000000..3a0caf3a49
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthSignatureTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.rest.MockRESTMessage;
+import org.apache.paimon.rest.RESTObjectMapper;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test for {@link DLFAuthSignature}. */
+public class DLFAuthSignatureTest {
+
+ @Test
+ public void testGetAuthorization() throws Exception {
+ String endpoint = "dlf.cn-hangzhou.aliyuncs.com";
+ String region = "cn-hangzhou";
+ String dateTime = "20231203T121212Z";
+ String date = "20231203";
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("k1", "v1");
+ parameters.put("k2", "v2");
+ String data =
+ RESTObjectMapper.OBJECT_MAPPER.writeValueAsString(
+ MockRESTMessage.createDatabaseRequest("database"));
+ RESTAuthParameter restAuthParameter =
+ new RESTAuthParameter(endpoint, "/v1/paimon/databases",
parameters, "POST", data);
+ DLFToken token = new DLFToken("access-key-id", "access-key-secret",
"securityToken", null);
+ Map<String, String> signHeaders =
+ DLFAuthProvider.generateSignHeaders(
+ restAuthParameter.host(),
+ restAuthParameter.data(),
+ dateTime,
+ "securityToken");
+ String authorization =
+ DLFAuthSignature.getAuthorization(
+ restAuthParameter, token, region, signHeaders,
dateTime, date);
+ Assertions.assertEquals(
+ "DLF4-HMAC-SHA256
Credential=access-key-id/20231203/cn-hangzhou/DlfNext/aliyun_v4_request,AdditionalHeaders=host,Signature=5afbdad67b52f17c47e202da2222bff9f5cf2f86c3ed973bb919a8216d086fb7",
+ authorization);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index c310de32fd..145fcd0ba3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterEach;
@@ -101,6 +102,7 @@ class RESTCatalogITCase extends CatalogITCaseBase {
options.put(RESTCatalogOptions.URI.key(), serverUrl);
options.put(RESTCatalogOptions.TOKEN.key(), initToken);
options.put(RESTCatalogOptions.THREAD_POOL_SIZE.key(), "" + 1);
+ options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.BEAR.identifier());
return options;
}