This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e41eacf9fd [core] RESTCatalog: add dlf auth it test (#5222)
e41eacf9fd is described below
commit e41eacf9fd89706c5b509662da5940179a7a0217
Author: jerry <[email protected]>
AuthorDate: Mon Mar 10 16:10:03 2025 +0800
[core] RESTCatalog: add dlf auth it test (#5222)
---
.../org/apache/paimon/rest/RESTCatalogServer.java | 502 ++++++++++-----------
.../org/apache/paimon/rest/RESTCatalogTest.java | 84 +++-
.../org/apache/paimon/flink/RESTCatalogITCase.java | 5 +-
.../paimon/spark/SparkCatalogWithRestTest.java | 5 +-
4 files changed, 337 insertions(+), 259 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 052bfeea70..7ed259aad8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -38,7 +38,8 @@ import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
-import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
@@ -95,6 +96,7 @@ import
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -116,8 +118,9 @@ public class RESTCatalogServer {
private static final Logger LOG =
LoggerFactory.getLogger(RESTCatalogServer.class);
public static final int DEFAULT_MAX_RESULTS = 100;
- public static final String MAX_RESULTS = "maxResults";
- public static final String PAGE_TOKEN = "pageToken";
+ public static final String MAX_RESULTS = RESTCatalog.MAX_RESULTS;
+ public static final String PAGE_TOKEN = RESTCatalog.PAGE_TOKEN;
+ public static final String AUTHORIZATION_HEADER_KEY = "Authorization";
private final String prefix;
private final String databaseUri;
@@ -125,7 +128,6 @@ public class RESTCatalogServer {
private final FileSystemCatalog catalog;
private final Dispatcher dispatcher;
private final MockWebServer server;
- private final String authToken;
private final Map<String, Database> databaseStore = new HashMap<>();
private final Map<String, TableMetadata> tableMetadataStore = new
HashMap<>();
@@ -140,14 +142,13 @@ public class RESTCatalogServer {
private ResourcePaths resourcePaths;
public RESTCatalogServer(
- String dataPath, String initToken, ConfigResponse config, String
warehouse) {
+ String dataPath, AuthProvider authProvider, ConfigResponse config,
String warehouse) {
this.warehouse = warehouse;
this.configResponse = config;
this.prefix =
this.configResponse.getDefaults().get(RESTCatalogInternalOptions.PREFIX.key());
ResourcePaths resourcePaths = new ResourcePaths(prefix);
this.databaseUri = resourcePaths.databases();
- authToken = initToken;
Options conf = new Options();
this.configResponse.getDefaults().forEach((k, v) -> conf.setString(k,
v));
conf.setString(CatalogOptions.WAREHOUSE.key(), dataPath);
@@ -161,7 +162,7 @@ public class RESTCatalogServer {
throw new UncheckedIOException(e);
}
this.catalog = new FileSystemCatalog(fileIO, warehousePath,
context.options());
- this.dispatcher = initDispatcher(authToken);
+ this.dispatcher = initDispatcher(authProvider);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
server = mockWebServer;
@@ -203,15 +204,40 @@ public class RESTCatalogServer {
return DataTokenStore.getDataToken(warehouse,
identifier.getFullName());
}
- public Dispatcher initDispatcher(String authToken) {
+ public Map<String, String> getHeader(RecordedRequest request) {
+ Map<String, String> headers = new HashMap<>();
+ for (Map.Entry<String, List<String>> header :
+ request.getHeaders().toMultimap().entrySet()) {
+ headers.put(header.getKey().toLowerCase(),
header.getValue().get(0));
+ }
+ return headers;
+ }
+
+ public Dispatcher initDispatcher(AuthProvider authProvider) {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) {
- String token =
-
request.getHeaders().get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY);
+ String token =
request.getHeaders().get(AUTHORIZATION_HEADER_KEY);
RESTResponse response;
try {
- if (!("Bearer " + authToken).equals(token)) {
+ Map<String, String> headers = getHeader(request);
+ String[] paths = request.getPath().split("\\?");
+ String resourcePath = paths[0];
+ Map<String, String> parameters =
+ paths.length == 2 ? getParameters(paths[1]) :
Collections.emptyMap();
+ String data = request.getBody().readUtf8();
+ RESTAuthParameter restAuthParameter =
+ new RESTAuthParameter(
+ request.getHeader("Host"),
+ resourcePath,
+ parameters,
+ request.getMethod(),
+ data);
+ String authToken =
+ authProvider
+ .header(headers, restAuthParameter)
+ .get(AUTHORIZATION_HEADER_KEY);
+ if (!authToken.equals(token)) {
return new MockResponse().setResponseCode(401);
}
if (request.getPath().startsWith(resourcePaths.config())
@@ -220,7 +246,7 @@ public class RESTCatalogServer {
.equals(warehouse)) {
return mockResponse(configResponse, 200);
} else if (databaseUri.equals(request.getPath())) {
- return databasesApiHandler(request);
+ return databasesApiHandler(restAuthParameter.method(),
data);
} else if (request.getPath().startsWith(databaseUri)) {
String[] resources =
request.getPath()
@@ -324,45 +350,66 @@ public class RESTCatalogServer {
}
}
if (isDropPartitions) {
- return dropPartitionsHandle(identifier, request);
+ return dropPartitionsHandle(identifier,
restAuthParameter.data());
} else if (isAlterPartitions) {
- return alterPartitionsHandle(identifier, request);
+ return alterPartitionsHandle(identifier,
restAuthParameter.data());
} else if (isMarkDonePartitions) {
MarkDonePartitionsRequest
markDonePartitionsRequest =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
- MarkDonePartitionsRequest.class);
+ OBJECT_MAPPER.readValue(data,
MarkDonePartitionsRequest.class);
catalog.markDonePartitions(
identifier,
markDonePartitionsRequest.getPartitionSpecs());
return new MockResponse().setResponseCode(200);
} else if (isPartitions) {
- return partitionsApiHandle(request, identifier);
+ return partitionsApiHandle(
+ restAuthParameter.method(),
+ restAuthParameter.data(),
+ parameters,
+ identifier);
} else if (isBranches) {
- return branchApiHandle(resources, request,
identifier);
+ return branchApiHandle(
+ resources,
+ restAuthParameter.method(),
+ restAuthParameter.data(),
+ identifier);
} else if (isTableToken) {
return getDataTokenHandle(identifier);
} else if (isTableSnapshot) {
return snapshotHandle(identifier);
} else if (isTableRename) {
- return renameTableHandle(request);
+ return renameTableHandle(restAuthParameter.data());
} else if (isTableCommit) {
- return commitTableHandle(request);
+ return commitTableHandle(restAuthParameter.data());
} else if (isTable) {
- return tableHandle(request, identifier);
+ return tableHandle(
+ restAuthParameter.method(),
+ restAuthParameter.data(),
+ identifier);
} else if (isTables) {
- return tablesHandle(request, databaseName);
+ return tablesHandle(
+ restAuthParameter.method(),
+ restAuthParameter.data(),
+ databaseName,
+ parameters);
} else if (isTableDetails) {
- return tableDetailsHandle(request, databaseName);
+ return tableDetailsHandle(parameters,
databaseName);
} else if (isViews) {
- return viewsHandle(request, databaseName);
+ return viewsHandle(
+ restAuthParameter.method(),
+ restAuthParameter.data(),
+ databaseName,
+ parameters);
} else if (isViewsDetails) {
- return viewDetailsHandle(request, databaseName);
+ return viewDetailsHandle(
+ restAuthParameter.method(), databaseName,
parameters);
} else if (isViewRename) {
- return renameViewHandle(request);
+ return renameViewHandle(restAuthParameter.data());
} else if (isView) {
- return viewHandle(request, identifier);
+ return viewHandle(restAuthParameter.method(),
identifier);
} else {
- return databaseHandle(request, databaseName);
+ return databaseHandle(
+ restAuthParameter.method(),
+ restAuthParameter.data(),
+ databaseName);
}
}
return new MockResponse().setResponseCode(404);
@@ -526,9 +573,8 @@ public class RESTCatalogServer {
new ErrorResponse(ErrorResponseResourceType.TABLE,
null, "", 404), 404));
}
- private MockResponse commitTableHandle(RecordedRequest request) throws
Exception {
- CommitTableRequest requestBody =
- OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
CommitTableRequest.class);
+ private MockResponse commitTableHandle(String data) throws Exception {
+ CommitTableRequest requestBody = OBJECT_MAPPER.readValue(data,
CommitTableRequest.class);
Identifier identifier = requestBody.getIdentifier();
if
(noPermissionTables.contains(requestBody.getIdentifier().getFullName())) {
throw new
Catalog.TableNoPermissionException(requestBody.getIdentifier());
@@ -550,17 +596,16 @@ public class RESTCatalogServer {
return mockResponse(response, 200);
}
- private MockResponse databasesApiHandler(RecordedRequest request) throws
Exception {
+ private MockResponse databasesApiHandler(String method, String data)
throws Exception {
RESTResponse response;
- switch (request.getMethod()) {
+ switch (method) {
case "GET":
List<String> databaseNameList = new
ArrayList<>(databaseStore.keySet());
response = new ListDatabasesResponse(databaseNameList);
return mockResponse(response, 200);
case "POST":
CreateDatabaseRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
CreateDatabaseRequest.class);
+ OBJECT_MAPPER.readValue(data,
CreateDatabaseRequest.class);
String databaseName = requestBody.getName();
if (noPermissionDatabases.contains(databaseName)) {
throw new
Catalog.DatabaseNoPermissionException(databaseName);
@@ -575,12 +620,12 @@ public class RESTCatalogServer {
}
}
- private MockResponse databaseHandle(RecordedRequest request, String
databaseName)
+ private MockResponse databaseHandle(String method, String data, String
databaseName)
throws Exception {
RESTResponse response;
Database database;
if (databaseStore.containsKey(databaseName)) {
- switch (request.getMethod()) {
+ switch (method) {
case "GET":
database = databaseStore.get(databaseName);
response =
@@ -595,8 +640,7 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(200);
case "POST":
AlterDatabaseRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
AlterDatabaseRequest.class);
+ OBJECT_MAPPER.readValue(data,
AlterDatabaseRequest.class);
List<PropertyChange> changes = new ArrayList<>();
for (String property : requestBody.getRemovals()) {
changes.add(PropertyChange.removeProperty(property));
@@ -636,49 +680,37 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(404);
}
- private MockResponse tablesHandle(RecordedRequest request, String
databaseName)
+ private MockResponse tablesHandle(
+ String method, String data, String databaseName, Map<String,
String> parameters)
throws Exception {
if (databaseStore.containsKey(databaseName)) {
- if (Objects.nonNull(request)
- && Objects.nonNull(request.getMethod())
- && Objects.nonNull(request.getRequestUrl())) {
- switch (request.getMethod()) {
- case "GET":
- List<String> tables = listTables(databaseName);
- return generateFinalListTablesResponse(request,
tables);
- case "POST":
- CreateTableRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
CreateTableRequest.class);
- Identifier identifier = requestBody.getIdentifier();
- Schema schema = requestBody.getSchema();
- TableMetadata tableMetadata;
- if (isFormatTable(schema)) {
- tableMetadata = createFormatTable(identifier,
schema);
- } else {
- catalog.createTable(identifier, schema, false);
- tableMetadata =
- createTableMetadata(
- requestBody.getIdentifier(),
- 1L,
- requestBody.getSchema(),
- UUID.randomUUID().toString(),
- false);
- }
- tableMetadataStore.put(
- requestBody.getIdentifier().getFullName(),
tableMetadata);
- return new MockResponse().setResponseCode(200);
- default:
- return new MockResponse().setResponseCode(404);
- }
- } else {
- return mockResponse(
- new ErrorResponse(
- ErrorResponseResourceType.TABLE,
- null,
- "invalid input " + request,
- 400),
- 400);
+ switch (method) {
+ case "GET":
+ List<String> tables = listTables(databaseName);
+ return generateFinalListTablesResponse(parameters, tables);
+ case "POST":
+ CreateTableRequest requestBody =
+ OBJECT_MAPPER.readValue(data,
CreateTableRequest.class);
+ Identifier identifier = requestBody.getIdentifier();
+ Schema schema = requestBody.getSchema();
+ TableMetadata tableMetadata;
+ if (isFormatTable(schema)) {
+ tableMetadata = createFormatTable(identifier, schema);
+ } else {
+ catalog.createTable(identifier, schema, false);
+ tableMetadata =
+ createTableMetadata(
+ requestBody.getIdentifier(),
+ 1L,
+ requestBody.getSchema(),
+ UUID.randomUUID().toString(),
+ false);
+ }
+ tableMetadataStore.put(
+ requestBody.getIdentifier().getFullName(),
tableMetadata);
+ return new MockResponse().setResponseCode(200);
+ default:
+ return new MockResponse().setResponseCode(404);
}
}
return mockResponse(
@@ -697,31 +729,26 @@ public class RESTCatalogServer {
}
private MockResponse generateFinalListTablesResponse(
- RecordedRequest request, List<String> tables) {
+ Map<String, String> parameters, List<String> tables) {
RESTResponse response;
if (!tables.isEmpty()) {
int maxResults;
try {
- maxResults = getMaxResults(request);
+ maxResults = getMaxResults(parameters);
} catch (Exception e) {
LOG.error(
"parse maxResults {} to int failed",
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(MAX_RESULTS)
- : null);
+ parameters.getOrDefault(MAX_RESULTS, null));
return mockResponse(
new ErrorResponse(
ErrorResponseResourceType.TABLE,
null,
"invalid input queryParameter maxResults"
- +
request.getRequestUrl().queryParameter(MAX_RESULTS),
+ + parameters.get(MAX_RESULTS),
400),
400);
}
- String pageToken =
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(PAGE_TOKEN)
- : null;
+ String pageToken = parameters.getOrDefault(PAGE_TOKEN, null);
List<String> sortedTables =
tables.stream().sorted(String::compareTo).collect(Collectors.toList());
@@ -754,52 +781,42 @@ public class RESTCatalogServer {
return new ListTablesResponse(pagedTables, nextPageToken);
}
- private MockResponse tableDetailsHandle(RecordedRequest request, String
databaseName) {
+ private MockResponse tableDetailsHandle(Map<String, String> parameters,
String databaseName) {
RESTResponse response;
- if (Objects.nonNull(request)
- && "GET".equals(request.getMethod())
- && Objects.nonNull(request.getRequestUrl())) {
-
- List<GetTableResponse> tableDetails =
listTableDetails(databaseName);
- if (!tableDetails.isEmpty()) {
- int maxResults;
- try {
- maxResults = getMaxResults(request);
- } catch (Exception e) {
- LOG.error(
- "parse maxResults {} to int failed",
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(MAX_RESULTS)
- : null);
- return mockResponse(
- new ErrorResponse(
- ErrorResponseResourceType.TABLE,
- null,
- "invalid input queryParameter maxResults"
- +
request.getRequestUrl().queryParameter(MAX_RESULTS),
- 400),
- 400);
- }
- String pageToken =
request.getRequestUrl().queryParameter(PAGE_TOKEN);
- List<GetTableResponse> sortedTableDetails =
- tableDetails.stream()
-
.sorted(Comparator.comparing(GetTableResponse::getName))
- .collect(Collectors.toList());
+ List<GetTableResponse> tableDetails = listTableDetails(databaseName);
+ if (!tableDetails.isEmpty()) {
+ int maxResults;
+ try {
+ maxResults = getMaxResults(parameters);
+ } catch (Exception e) {
+ LOG.error(
+ "parse maxResults {} to int failed",
+ parameters.getOrDefault(MAX_RESULTS, null));
+ return mockResponse(
+ new ErrorResponse(
+ ErrorResponseResourceType.TABLE,
+ null,
+ "invalid input queryParameter maxResults"
+ + parameters.get(MAX_RESULTS),
+ 400),
+ 400);
+ }
+ String pageToken = parameters.get(PAGE_TOKEN);
+ List<GetTableResponse> sortedTableDetails =
+ tableDetails.stream()
+
.sorted(Comparator.comparing(GetTableResponse::getName))
+ .collect(Collectors.toList());
- if (maxResults > sortedTableDetails.size() && pageToken ==
null) {
- response = new
ListTableDetailsResponse(sortedTableDetails, null);
- } else {
- response =
- generateListTableDetailsResponse(
- sortedTableDetails, maxResults, pageToken);
- }
+ if (maxResults > sortedTableDetails.size() && pageToken == null) {
+ response = new ListTableDetailsResponse(sortedTableDetails,
null);
} else {
- response = new
ListTableDetailsResponse(Collections.emptyList(), null);
+ response =
+ generateListTableDetailsResponse(sortedTableDetails,
maxResults, pageToken);
}
- return mockResponse(response, 200);
} else {
- return new MockResponse().setResponseCode(404);
+ response = new ListTableDetailsResponse(Collections.emptyList(),
null);
}
+ return mockResponse(response, 200);
}
private List<GetTableResponse> listTableDetails(String databaseName) {
@@ -842,13 +859,13 @@ public class RESTCatalogServer {
return Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE;
}
- private MockResponse tableHandle(RecordedRequest request, Identifier
identifier)
+ private MockResponse tableHandle(String method, String data, Identifier
identifier)
throws Exception {
RESTResponse response;
if (noPermissionTables.contains(identifier.getFullName())) {
throw new Catalog.TableNoPermissionException(identifier);
}
- switch (request.getMethod()) {
+ switch (method) {
case "GET":
TableMetadata tableMetadata;
identifier.isSystemTable();
@@ -870,8 +887,7 @@ public class RESTCatalogServer {
return mockResponse(response, 200);
case "POST":
AlterTableRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
AlterTableRequest.class);
+ OBJECT_MAPPER.readValue(data, AlterTableRequest.class);
alterTableImpl(identifier, requestBody.getChanges());
return new MockResponse().setResponseCode(200);
case "DELETE":
@@ -887,9 +903,8 @@ public class RESTCatalogServer {
}
}
- private MockResponse renameTableHandle(RecordedRequest request) throws
Exception {
- RenameTableRequest requestBody =
- OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
RenameTableRequest.class);
+ private MockResponse renameTableHandle(String data) throws Exception {
+ RenameTableRequest requestBody = OBJECT_MAPPER.readValue(data,
RenameTableRequest.class);
Identifier fromTable = requestBody.getSource();
Identifier toTable = requestBody.getDestination();
if (noPermissionTables.contains(fromTable.getFullName())) {
@@ -910,45 +925,36 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(200);
}
- private MockResponse partitionsApiHandle(RecordedRequest request,
Identifier tableIdentifier)
+ private MockResponse partitionsApiHandle(
+ String method, String data, Map<String, String> parameters,
Identifier tableIdentifier)
throws Exception {
- if (Objects.nonNull(request)
- && Objects.nonNull(request.getMethod())
- && Objects.nonNull(request.getRequestUrl())) {
- switch (request.getMethod()) {
- case "GET":
- List<Partition> partitions = new ArrayList<>();
- for (Map.Entry<String, List<Partition>> entry :
- tablePartitionsStore.entrySet()) {
- String tableName =
Identifier.fromString(entry.getKey()).getTableName();
- if (tableName.equals(tableIdentifier.getTableName())) {
- partitions.addAll(entry.getValue());
- }
+ switch (method) {
+ case "GET":
+ List<Partition> partitions = new ArrayList<>();
+ for (Map.Entry<String, List<Partition>> entry :
tablePartitionsStore.entrySet()) {
+ String tableName =
Identifier.fromString(entry.getKey()).getTableName();
+ if (tableName.equals(tableIdentifier.getTableName())) {
+ partitions.addAll(entry.getValue());
}
- return generateFinalListPartitionsResponse(request,
partitions);
- case "POST":
- CreatePartitionsRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
CreatePartitionsRequest.class);
- tablePartitionsStore.put(
- tableIdentifier.getFullName(),
- requestBody.getPartitionSpecs().stream()
- .map(partition ->
spec2Partition(partition))
- .collect(Collectors.toList()));
- return new MockResponse().setResponseCode(200);
- default:
- return new MockResponse().setResponseCode(404);
- }
- } else {
- return mockResponse(
- new ErrorResponse(
- ErrorResponseResourceType.TABLE, null, "invalid
input " + request, 400),
- 400);
+ }
+ return generateFinalListPartitionsResponse(parameters,
partitions);
+ case "POST":
+ CreatePartitionsRequest requestBody =
+ OBJECT_MAPPER.readValue(data,
CreatePartitionsRequest.class);
+ tablePartitionsStore.put(
+ tableIdentifier.getFullName(),
+ requestBody.getPartitionSpecs().stream()
+ .map(partition -> spec2Partition(partition))
+ .collect(Collectors.toList()));
+ return new MockResponse().setResponseCode(200);
+ default:
+ return new MockResponse().setResponseCode(404);
}
}
private MockResponse branchApiHandle(
- String[] resources, RecordedRequest request, Identifier
identifier) throws Exception {
+ String[] resources, String method, String data, Identifier
identifier)
+ throws Exception {
RESTResponse response;
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
BranchManager branchManager = table.branchManager();
@@ -956,7 +962,7 @@ public class RESTCatalogServer {
String branch = "";
Identifier branchIdentifier;
try {
- switch (request.getMethod()) {
+ switch (method) {
case "DELETE":
branch = RESTUtil.decodeString(resources[4]);
branchIdentifier =
@@ -974,14 +980,12 @@ public class RESTCatalogServer {
case "POST":
if (resources.length == 5) {
ForwardBranchRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
ForwardBranchRequest.class);
+ OBJECT_MAPPER.readValue(data,
ForwardBranchRequest.class);
branch = requestBody.branch();
branchManager.fastForward(requestBody.branch());
} else {
CreateBranchRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
CreateBranchRequest.class);
+ OBJECT_MAPPER.readValue(data,
CreateBranchRequest.class);
branch = requestBody.branch();
if (requestBody.fromTag() == null) {
branchManager.createBranch(requestBody.branch());
@@ -1028,31 +1032,26 @@ public class RESTCatalogServer {
}
private MockResponse generateFinalListPartitionsResponse(
- RecordedRequest request, List<Partition> partitions) {
+ Map<String, String> parameters, List<Partition> partitions) {
RESTResponse response;
if (Objects.nonNull(partitions) && !partitions.isEmpty()) {
int maxResults;
try {
- maxResults = getMaxResults(request);
+ maxResults = getMaxResults(parameters);
} catch (Exception e) {
LOG.error(
"parse maxResults {} to int failed",
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(MAX_RESULTS)
- : null);
+ parameters.getOrDefault(MAX_RESULTS, null));
return mockResponse(
new ErrorResponse(
ErrorResponseResourceType.TABLE,
null,
"invalid input queryParameter maxResults"
- +
request.getRequestUrl().queryParameter(MAX_RESULTS),
+ + parameters.get(MAX_RESULTS),
400),
400);
}
- String pageToken =
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(PAGE_TOKEN)
- : null;
+ String pageToken = parameters.getOrDefault(PAGE_TOKEN, null);
List<Partition> sortedPartitions =
partitions.stream()
@@ -1088,41 +1087,33 @@ public class RESTCatalogServer {
return new ListPartitionsResponse(pagedPartitions, nextPageToken);
}
- private MockResponse viewsHandle(RecordedRequest request, String
databaseName)
+ private MockResponse viewsHandle(
+ String method, String data, String databaseName, Map<String,
String> parameters)
throws Exception {
- if (Objects.nonNull(request)
- && Objects.nonNull(request.getMethod())
- && Objects.nonNull(request.getRequestUrl())) {
- switch (request.getMethod()) {
- case "GET":
- List<String> views = listViews(databaseName);
- return generateFinalListViewsResponse(request, views);
- case "POST":
- CreateViewRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
CreateViewRequest.class);
- Identifier identifier = requestBody.getIdentifier();
- ViewSchema schema = requestBody.getSchema();
- ViewImpl view =
- new ViewImpl(
- requestBody.getIdentifier(),
- schema.fields(),
- schema.query(),
- schema.dialects(),
- schema.comment(),
- schema.options());
- if (viewStore.containsKey(identifier.getFullName())) {
- throw new
Catalog.ViewAlreadyExistException(identifier);
- }
- viewStore.put(identifier.getFullName(), view);
- return new MockResponse().setResponseCode(200);
- default:
- return new MockResponse().setResponseCode(404);
- }
- } else {
- return mockResponse(
- new ErrorResponse(ErrorResponseResourceType.TABLE, null,
"invalid input", 400),
- 400);
+ switch (method) {
+ case "GET":
+ List<String> views = listViews(databaseName);
+ return generateFinalListViewsResponse(parameters, views);
+ case "POST":
+ CreateViewRequest requestBody =
+ OBJECT_MAPPER.readValue(data, CreateViewRequest.class);
+ Identifier identifier = requestBody.getIdentifier();
+ ViewSchema schema = requestBody.getSchema();
+ ViewImpl view =
+ new ViewImpl(
+ requestBody.getIdentifier(),
+ schema.fields(),
+ schema.query(),
+ schema.dialects(),
+ schema.comment(),
+ schema.options());
+ if (viewStore.containsKey(identifier.getFullName())) {
+ throw new Catalog.ViewAlreadyExistException(identifier);
+ }
+ viewStore.put(identifier.getFullName(), view);
+ return new MockResponse().setResponseCode(200);
+ default:
+ return new MockResponse().setResponseCode(404);
}
}
@@ -1135,31 +1126,26 @@ public class RESTCatalogServer {
}
private MockResponse generateFinalListViewsResponse(
- RecordedRequest request, List<String> views) {
+ Map<String, String> parameters, List<String> views) {
RESTResponse response;
if (!views.isEmpty()) {
int maxResults;
try {
- maxResults = getMaxResults(request);
+ maxResults = getMaxResults(parameters);
} catch (Exception e) {
LOG.error(
"parse maxResults {} to int failed",
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(MAX_RESULTS)
- : null);
+ parameters.getOrDefault(MAX_RESULTS, null));
return mockResponse(
new ErrorResponse(
ErrorResponseResourceType.TABLE,
null,
"invalid input queryParameter maxResults"
- +
request.getRequestUrl().queryParameter(MAX_RESULTS),
+ + parameters.get(MAX_RESULTS),
400),
400);
}
- String pageToken =
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(PAGE_TOKEN)
- : null;
+ String pageToken = parameters.getOrDefault(PAGE_TOKEN, null);
List<String> sortedViews =
views.stream().sorted(String::compareTo).collect(Collectors.toList());
@@ -1193,37 +1179,31 @@ public class RESTCatalogServer {
return new ListViewsResponse(pagedViews, nextPageToken);
}
- private MockResponse viewDetailsHandle(RecordedRequest request, String
databaseName) {
+ private MockResponse viewDetailsHandle(
+ String method, String databaseName, Map<String, String>
parameters) {
RESTResponse response;
- if (Objects.nonNull(request)
- && "GET".equals(request.getMethod())
- && Objects.nonNull(request.getRequestUrl())) {
+ if ("GET".equals(method)) {
List<GetViewResponse> viewDetails = listViewDetails(databaseName);
if (!viewDetails.isEmpty()) {
int maxResults;
try {
- maxResults = getMaxResults(request);
+ maxResults = getMaxResults(parameters);
} catch (Exception e) {
LOG.error(
"parse maxResults {} to int failed",
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(MAX_RESULTS)
- : null);
+ parameters.getOrDefault(MAX_RESULTS, null));
return mockResponse(
new ErrorResponse(
ErrorResponseResourceType.TABLE,
null,
"invalid input queryParameter maxResults"
- +
request.getRequestUrl().queryParameter(MAX_RESULTS),
+ + parameters.get(MAX_RESULTS),
400),
400);
}
- String pageToken =
- Objects.nonNull(request.getRequestUrl())
- ?
request.getRequestUrl().queryParameter(PAGE_TOKEN)
- : null;
+ String pageToken = parameters.getOrDefault(PAGE_TOKEN, null);
List<GetViewResponse> sortedViewDetails =
viewDetails.stream()
@@ -1284,11 +1264,10 @@ public class RESTCatalogServer {
return new ListViewDetailsResponse(pagedViewDetails, nextPageToken);
}
- private MockResponse viewHandle(RecordedRequest request, Identifier
identifier)
- throws Exception {
+ private MockResponse viewHandle(String method, Identifier identifier)
throws Exception {
RESTResponse response;
if (viewStore.containsKey(identifier.getFullName())) {
- switch (request.getMethod()) {
+ switch (method) {
case "GET":
if (viewStore.containsKey(identifier.getFullName())) {
View view = viewStore.get(identifier.getFullName());
@@ -1313,9 +1292,8 @@ public class RESTCatalogServer {
throw new Catalog.ViewNotExistException(identifier);
}
- private MockResponse renameViewHandle(RecordedRequest request) throws
Exception {
- RenameTableRequest requestBody =
- OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
RenameTableRequest.class);
+ private MockResponse renameViewHandle(String data) throws Exception {
+ RenameTableRequest requestBody = OBJECT_MAPPER.readValue(data,
RenameTableRequest.class);
Identifier fromView = requestBody.getSource();
Identifier toView = requestBody.getDestination();
if (!viewStore.containsKey(fromView.getFullName())) {
@@ -1383,10 +1361,10 @@ public class RESTCatalogServer {
}
}
- private MockResponse dropPartitionsHandle(Identifier identifier,
RecordedRequest request)
+ private MockResponse dropPartitionsHandle(Identifier identifier, String
data)
throws Catalog.TableNotExistException, JsonProcessingException {
DropPartitionsRequest dropPartitionsRequest =
- OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
DropPartitionsRequest.class);
+ OBJECT_MAPPER.readValue(data, DropPartitionsRequest.class);
List<Map<String, String>> partitionSpecs =
dropPartitionsRequest.getPartitionSpecs();
if (tableMetadataStore.containsKey(identifier.getFullName())) {
List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
@@ -1413,12 +1391,11 @@ public class RESTCatalogServer {
}
}
- private MockResponse alterPartitionsHandle(Identifier identifier,
RecordedRequest request)
+ private MockResponse alterPartitionsHandle(Identifier identifier, String
data)
throws Catalog.TableNotExistException, JsonProcessingException {
if (tableMetadataStore.containsKey(identifier.getFullName())) {
AlterPartitionsRequest alterPartitionsRequest =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
AlterPartitionsRequest.class);
+ OBJECT_MAPPER.readValue(data,
AlterPartitionsRequest.class);
List<Partition> partitions =
alterPartitionsRequest.getPartitions();
List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
partitions.forEach(
@@ -1508,8 +1485,8 @@ public class RESTCatalogServer {
throw new Catalog.TableNotExistException(identifier);
}
- private static int getMaxResults(RecordedRequest request) {
- String strMaxResults =
request.getRequestUrl().queryParameter(MAX_RESULTS);
+ private static int getMaxResults(Map<String, String> parameters) {
+ String strMaxResults = parameters.get(MAX_RESULTS);
Integer maxResults =
Objects.nonNull(strMaxResults) ?
Integer.parseInt(strMaxResults) : null;
if (Objects.isNull(maxResults) || maxResults <= 0) {
@@ -1567,4 +1544,17 @@ public class RESTCatalogServer {
private String getPartitionSortKey(Partition partition) {
return partition.spec().toString().replace("{", "").replace("}", "");
}
+
+ private Map<String, String> getParameters(String query) {
+ Map<String, String> parameters =
+ Arrays.stream(query.split("&"))
+ .map(pair -> pair.split("=", 2))
+ .collect(
+ Collectors.toMap(
+ pair -> pair[0].trim(), // key
+ pair ->
RESTUtil.decodeString(pair[1].trim()), // value
+ (existing, replacement) -> existing //
handle duplicates
+ ));
+ return parameters;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 5e47bbe7b6..ba8e987930 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -32,8 +32,11 @@ import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.AuthProviderEnum;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.auth.DLFAuthProvider;
+import org.apache.paimon.rest.auth.DLFToken;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.responses.ConfigResponse;
@@ -57,12 +60,16 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -78,6 +85,7 @@ import static
org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.rest.RESTCatalog.PAGE_TOKEN;
+import static org.apache.paimon.rest.auth.DLFAuthProvider.TOKEN_DATE_FORMATTER;
import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -97,6 +105,7 @@ class RESTCatalogTest extends CatalogTestBase {
private Options options = new Options();
private String dataPath;
private RESTCatalog restCatalog;
+ private AuthProvider authProvider;
@BeforeEach
@Override
@@ -114,7 +123,9 @@ class RESTCatalogTest extends CatalogTestBase {
CatalogOptions.WAREHOUSE.key(),
restWarehouse),
ImmutableMap.of());
- restCatalogServer = new RESTCatalogServer(dataPath, initToken,
this.config, restWarehouse);
+ this.authProvider = new BearTokenAuthProvider(initToken);
+ restCatalogServer =
+ new RESTCatalogServer(dataPath, authProvider, this.config,
restWarehouse);
restCatalogServer.start();
options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
@@ -140,6 +151,51 @@ class RESTCatalogTest extends CatalogTestBase {
.isInstanceOf(NotAuthorizedException.class);
}
+ @Test
+ void testDlfStSTokenAuth() throws Exception {
+ String restWarehouse = UUID.randomUUID().toString();
+ String akId = "akId" + UUID.randomUUID();
+ String akSecret = "akSecret" + UUID.randomUUID();
+ String securityToken = "securityToken" + UUID.randomUUID();
+ String region = "cn-hangzhou";
+ DLFAuthProvider authProvider =
+ DLFAuthProvider.buildAKToken(akId, akSecret, securityToken,
region);
+ restCatalogServer =
+ new RESTCatalogServer(dataPath, authProvider, this.config,
restWarehouse);
+ restCatalogServer.start();
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.DLF.identifier());
+ options.set(RESTCatalogOptions.DLF_REGION, region);
+ options.set(RESTCatalogOptions.DLF_ACCESS_KEY_ID, akId);
+ options.set(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET, akSecret);
+ options.set(RESTCatalogOptions.DLF_SECURITY_TOKEN, securityToken);
+ RESTCatalog restCatalog = new
RESTCatalog(CatalogContext.create(options));
+ testDlfAuth(restCatalog);
+ }
+
+ @Test
+ void testDlfStSTokenPathAuth() throws Exception {
+ String restWarehouse = UUID.randomUUID().toString();
+ String region = "cn-hangzhou";
+ String tokenPath = dataPath + UUID.randomUUID();
+ generateTokenAndWriteToFile(tokenPath);
+ DLFAuthProvider authProvider =
+ DLFAuthProvider.buildRefreshToken(tokenPath, 1000_000L,
region);
+ restCatalogServer =
+ new RESTCatalogServer(dataPath, authProvider, this.config,
restWarehouse);
+ restCatalogServer.start();
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.DLF.identifier());
+ options.set(RESTCatalogOptions.DLF_REGION, region);
+ options.set(RESTCatalogOptions.DLF_TOKEN_PATH, tokenPath);
+ RESTCatalog restCatalog = new
RESTCatalog(CatalogContext.create(options));
+ testDlfAuth(restCatalog);
+ File file = new File(tokenPath);
+ file.delete();
+ }
+
@Test
void testHeader() {
Map<String, String> parameters = new HashMap<>();
@@ -1034,4 +1090,30 @@ class RESTCatalogTest extends CatalogTestBase {
});
return result;
}
+
+ private void generateTokenAndWriteToFile(String tokenPath) throws
IOException {
+ File tokenFile = new File(tokenPath);
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ String expiration = now.format(TOKEN_DATE_FORMATTER);
+ String secret = UUID.randomUUID().toString();
+ DLFToken token = new DLFToken("accessKeyId", secret, "securityToken",
expiration);
+ String tokenStr =
RESTObjectMapper.OBJECT_MAPPER.writeValueAsString(token);
+ FileUtils.writeStringToFile(tokenFile, tokenStr);
+ }
+
+ private void testDlfAuth(RESTCatalog restCatalog) throws Exception {
+ String databaseName = "db1";
+ restCatalog.createDatabase(databaseName, true);
+ String[] tableNames = {"dt=20230101", "dt=20230102", "dt=20230103"};
+ for (String tableName : tableNames) {
+ restCatalog.createTable(
+ Identifier.create(databaseName, tableName),
DEFAULT_TABLE_SCHEMA, false);
+ }
+ PagedList<String> listTablesPaged =
+ restCatalog.listTablesPaged(databaseName, 1, "dt=20230101");
+ PagedList<String> listTablesPaged2 =
+ restCatalog.listTablesPaged(databaseName, 1,
listTablesPaged.getNextPageToken());
+ assertEquals(listTablesPaged.getElements().get(0), "dt=20230102");
+ assertEquals(listTablesPaged2.getElements().get(0), "dt=20230103");
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 5b4f5eae21..ee5907c3dd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -26,7 +26,9 @@ import org.apache.paimon.rest.RESTCatalogServer;
import org.apache.paimon.rest.RESTFileIOTestLoader;
import org.apache.paimon.rest.RESTTestFileIO;
import org.apache.paimon.rest.RESTToken;
+import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -74,7 +76,8 @@ class RESTCatalogITCase extends CatalogITCaseBase {
CatalogOptions.WAREHOUSE.key(),
warehouse),
ImmutableMap.of());
- restCatalogServer = new RESTCatalogServer(dataPath, initToken, config,
warehouse);
+ AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+ restCatalogServer = new RESTCatalogServer(dataPath, authProvider,
config, warehouse);
restCatalogServer.start();
serverUrl = restCatalogServer.getUrl();
super.before();
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index d15e14c7d5..0748cf9d08 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -21,7 +21,9 @@ package org.apache.paimon.spark;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -59,7 +61,8 @@ public class SparkCatalogWithRestTest {
CatalogOptions.WAREHOUSE.key(),
warehouse),
ImmutableMap.of());
- restCatalogServer = new RESTCatalogServer(dataPath, initToken, config,
warehouse);
+ AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+ restCatalogServer = new RESTCatalogServer(dataPath, authProvider,
config, warehouse);
restCatalogServer.start();
serverUrl = restCatalogServer.getUrl();
}