This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 48dce34f35 [core] RESTCatalog: support data token IT test (#5184)
48dce34f35 is described below
commit 48dce34f357f84de5190c7529bfc26fb04166795
Author: jerry <[email protected]>
AuthorDate: Tue Mar 4 16:11:36 2025 +0800
[core] RESTCatalog: support data token IT test (#5184)
---
.../org/apache/paimon/rest/DataTokenProvider.java | 61 --------
.../org/apache/paimon/rest/DataTokenStore.java | 55 ++++++++
.../org/apache/paimon/rest/RESTCatalogServer.java | 153 +++++++++++++--------
.../org/apache/paimon/rest/RESTCatalogTest.java | 147 ++++++++++++++------
.../apache/paimon/rest/RESTFileIOTestLoader.java | 41 ++++++
.../org/apache/paimon/rest/RESTTestFileIO.java | 129 +++++++++++++++++
.../services/org.apache.paimon.fs.FileIOLoader | 1 +
.../org/apache/paimon/flink/RESTCatalogITCase.java | 84 +++++++++--
.../paimon/spark/SparkCatalogWithRestTest.java | 23 +++-
9 files changed, 519 insertions(+), 175 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
deleted file mode 100644
index 081ada0fba..0000000000
--- a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-/** Refresh data token in test mode. */
-public class DataTokenProvider {
-
- private Map<String, String> token;
- private long expiresAtMillis;
-
- public DataTokenProvider(Map<String, String> token, long expiresAtMillis) {
- this.token = token;
- this.expiresAtMillis = expiresAtMillis;
- }
-
- public void setExpiresAtMillis(long expiresAtMillis) {
- this.expiresAtMillis = expiresAtMillis;
- }
-
- public Map<String, String> getToken() {
- return token;
- }
-
- public long getExpiresAtMillis() {
- return expiresAtMillis;
- }
-
- public void setToken(Map<String, String> token) {
- this.token = token;
- }
-
- public void refresh() {
- this.token =
- ImmutableMap.of(
- "ak",
- "ak-" + System.currentTimeMillis(),
- "sk",
- "sk-" + System.currentTimeMillis());
- this.expiresAtMillis = System.currentTimeMillis();
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java
b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java
new file mode 100644
index 0000000000..87eb470028
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** DataTokenStore is used to store data token. */
+public class DataTokenStore {
+
+ // as warehouse means one catalog instance, so we use warehouse as key to
store data token
+ private static final Map<String, Map<String, RESTToken>>
warehouse2DataTokenStore =
+ new HashMap<>();
+
+ public static void putDataToken(String warehouse, String tableFullName,
RESTToken dataToken) {
+ Map<String, RESTToken> dataTokenStore =
warehouse2DataTokenStore.get(warehouse);
+ if (dataTokenStore == null) {
+ dataTokenStore = new HashMap<>();
+ warehouse2DataTokenStore.put(warehouse, dataTokenStore);
+ }
+ dataTokenStore.put(tableFullName, dataToken);
+ }
+
+ public static RESTToken getDataToken(String warehouse, String
tableFullName) {
+ Map<String, RESTToken> dataTokenStore =
warehouse2DataTokenStore.get(warehouse);
+ if (dataTokenStore == null) {
+ return null;
+ }
+ return dataTokenStore.get(tableFullName);
+ }
+
+ public static void removeDataToken(String warehouse, String tableFullName)
{
+ Map<String, RESTToken> dataTokenStore =
warehouse2DataTokenStore.get(warehouse);
+ if (dataTokenStore != null &&
dataTokenStore.containsKey(tableFullName)) {
+ dataTokenStore.remove(tableFullName);
+ warehouse2DataTokenStore.put(warehouse, dataTokenStore);
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 30d7b0a598..4f96d0727d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -27,9 +27,13 @@ import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
+import org.apache.paimon.catalog.SupportsBranches;
+import org.apache.paimon.catalog.SupportsSnapshots;
import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.fs.local.LocalFileIOLoader;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -50,6 +54,7 @@ import
org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.CommitTableResponse;
+import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
@@ -66,7 +71,9 @@ import org.apache.paimon.rest.responses.ListViewsResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
@@ -101,8 +108,8 @@ import static
org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
/** Mock REST server for testing. */
public class RESTCatalogServer {
- private static final String PREFIX = "paimon";
- private static final String DATABASE_URI =
String.format("/v1/%s/databases", PREFIX);
+ private final String prefix;
+ private final String databaseUri;
private final FileSystemCatalog catalog;
private final Dispatcher dispatcher;
@@ -114,23 +121,34 @@ public class RESTCatalogServer {
public final Map<String, List<Partition>> tablePartitionsStore = new
HashMap<>();
public final Map<String, View> viewStore = new HashMap<>();
public final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
- Map<String, RESTToken> dataTokenStore = new HashMap<>();
-
- public RESTCatalogServer(String warehouse, String initToken) {
+ public final ConfigResponse configResponse;
+ public final String warehouse;
+
+ private ResourcePaths resourcePaths;
+
+ public RESTCatalogServer(
+ String dataPath, String initToken, ConfigResponse config, String
warehouse) {
+ this.warehouse = warehouse;
+ this.configResponse = config;
+ this.prefix =
+
this.configResponse.getDefaults().get(RESTCatalogInternalOptions.PREFIX.key());
+ ResourcePaths resourcePaths = new ResourcePaths(prefix);
+ this.databaseUri = resourcePaths.databases();
authToken = initToken;
Options conf = new Options();
- conf.setString("warehouse", warehouse);
+ this.configResponse.getDefaults().forEach((k, v) -> conf.setString(k,
v));
+ conf.setString(CatalogOptions.WAREHOUSE.key(), dataPath);
CatalogContext context = CatalogContext.create(conf);
- Path warehousePath = new Path(warehouse);
+ Path warehousePath = new Path(dataPath);
FileIO fileIO;
try {
- fileIO = FileIO.get(warehousePath, context);
+ fileIO = new LocalFileIO();
fileIO.checkOrMkdirs(warehousePath);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
this.catalog = new FileSystemCatalog(fileIO, warehousePath,
context.options());
- this.dispatcher = initDispatcher(warehouse, authToken);
+ this.dispatcher = initDispatcher(authToken);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
server = mockWebServer;
@@ -153,10 +171,18 @@ public class RESTCatalogServer {
}
public void setDataToken(Identifier identifier, RESTToken token) {
- dataTokenStore.put(identifier.getFullName(), token);
+ DataTokenStore.putDataToken(warehouse, identifier.getFullName(),
token);
+ }
+
+ public void removeDataToken(Identifier identifier) {
+ DataTokenStore.removeDataToken(warehouse, identifier.getFullName());
}
- public Dispatcher initDispatcher(String warehouse, String authToken) {
+ public RESTToken getDataToken(Identifier identifier) {
+ return DataTokenStore.getDataToken(warehouse,
identifier.getFullName());
+ }
+
+ public Dispatcher initDispatcher(String authToken) {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) {
@@ -167,23 +193,14 @@ public class RESTCatalogServer {
if (!("Bearer " + authToken).equals(token)) {
return new MockResponse().setResponseCode(401);
}
- if (request.getPath().startsWith("/v1/config")) {
- String body =
- String.format(
- "{\"defaults\": {\"%s\": \"%s\",
\"%s\": \"%s\", \"%s\": \"%s\"}}",
-
RESTCatalogInternalOptions.PREFIX.key(),
- PREFIX,
- CatalogOptions.WAREHOUSE.key(),
- warehouse,
- "header.test-header",
- "test-value");
- return new
MockResponse().setResponseCode(200).setBody(body);
- } else if (DATABASE_URI.equals(request.getPath())) {
+ if
(request.getPath().equals(resourcePaths.config(warehouse))) {
+ return mockResponse(configResponse, 200);
+ } else if (databaseUri.equals(request.getPath())) {
return databasesApiHandler(request);
- } else if (request.getPath().startsWith(DATABASE_URI)) {
+ } else if (request.getPath().startsWith(databaseUri)) {
String[] resources =
request.getPath()
- .substring((DATABASE_URI +
"/").length())
+ .substring((databaseUri +
"/").length())
.split("/");
String databaseName = resources[0];
if (!databaseStore.containsKey(databaseName)) {
@@ -275,7 +292,7 @@ public class RESTCatalogServer {
identifier,
markDonePartitionsRequest.getPartitionSpecs());
return new MockResponse().setResponseCode(200);
} else if (isPartitions) {
- return partitionsApiHandler(request, identifier);
+ return partitionsApiHandle(request, identifier);
} else if (isBranches) {
FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier);
BranchManager branchManager =
table.branchManager();
@@ -312,25 +329,25 @@ public class RESTCatalogServer {
return new
MockResponse().setResponseCode(404);
}
} else if (isTableToken) {
- return handleDataToken(identifier);
+ return getDataTokenHandle(identifier);
} else if (isTableSnapshot) {
- return handleSnapshot(identifier);
+ return snapshotHandle(identifier);
} else if (isTableRename) {
- return renameTableApiHandler(request);
+ return renameTableHandle(request);
} else if (isTableCommit) {
- return commitTableApiHandler(request);
+ return commitTableHandle(request);
} else if (isTable) {
- return tableApiHandler(request, identifier);
+ return tableHandle(request, identifier);
} else if (isTables) {
- return tablesApiHandler(request, databaseName);
+ return tablesHandle(request, databaseName);
} else if (isViews) {
- return viewsApiHandler(request, databaseName);
+ return viewsHandle(request, databaseName);
} else if (isViewRename) {
- return renameViewApiHandler(request);
+ return renameViewHandle(request);
} else if (isView) {
- return viewApiHandler(request, identifier);
+ return viewHandle(request, identifier);
} else {
- return databaseApiHandler(request, databaseName);
+ return databaseHandle(request, databaseName);
}
}
return new MockResponse().setResponseCode(404);
@@ -421,12 +438,10 @@ public class RESTCatalogServer {
};
}
- private MockResponse handleDataToken(Identifier tableIdentifier) throws
Exception {
- RESTToken dataToken;
- if (dataTokenStore.containsKey(tableIdentifier.getFullName())) {
- dataToken = dataTokenStore.get(tableIdentifier.getFullName());
- } else {
- long currentTimeMillis = System.currentTimeMillis();
+ private MockResponse getDataTokenHandle(Identifier tableIdentifier) throws
Exception {
+ RESTToken dataToken = getDataToken(tableIdentifier);
+ if (dataToken == null) {
+ long currentTimeMillis = System.currentTimeMillis() + 60_000;
dataToken =
new RESTToken(
ImmutableMap.of(
@@ -435,7 +450,7 @@ public class RESTCatalogServer {
"akSecret",
"akSecret" + currentTimeMillis),
currentTimeMillis);
- dataTokenStore.put(tableIdentifier.getFullName(), dataToken);
+ DataTokenStore.putDataToken(warehouse,
tableIdentifier.getFullName(), dataToken);
}
GetTableTokenResponse getTableTokenResponse =
new GetTableTokenResponse(dataToken.token(),
dataToken.expireAtMillis());
@@ -444,7 +459,7 @@ public class RESTCatalogServer {
.setBody(OBJECT_MAPPER.writeValueAsString(getTableTokenResponse));
}
- private MockResponse handleSnapshot(Identifier identifier) throws
Exception {
+ private MockResponse snapshotHandle(Identifier identifier) throws
Exception {
RESTResponse response;
Optional<Snapshot> snapshotOptional =
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
@@ -480,10 +495,11 @@ public class RESTCatalogServer {
new ErrorResponse(ErrorResponseResourceType.TABLE,
null, "", 404), 404));
}
- private MockResponse commitTableApiHandler(RecordedRequest request) throws
Exception {
+ private MockResponse commitTableHandle(RecordedRequest request) throws
Exception {
CommitTableRequest requestBody =
OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
CommitTableRequest.class);
- FileStoreTable table = (FileStoreTable)
catalog.getTable(requestBody.getIdentifier());
+ Identifier identifier = requestBody.getIdentifier();
+ FileStoreTable table = getFileTable(identifier);
RenamingSnapshotCommit commit =
new RenamingSnapshotCommit(table.snapshotManager(),
Lock.empty());
String branchName = requestBody.getIdentifier().getBranchName();
@@ -519,7 +535,7 @@ public class RESTCatalogServer {
}
}
- private MockResponse databaseApiHandler(RecordedRequest request, String
databaseName)
+ private MockResponse databaseHandle(RecordedRequest request, String
databaseName)
throws Exception {
RESTResponse response;
Database database;
@@ -580,7 +596,7 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(404);
}
- private MockResponse tablesApiHandler(RecordedRequest request, String
databaseName)
+ private MockResponse tablesHandle(RecordedRequest request, String
databaseName)
throws Exception {
RESTResponse response;
if (databaseStore.containsKey(databaseName)) {
@@ -629,7 +645,7 @@ public class RESTCatalogServer {
return Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE;
}
- private MockResponse tableApiHandler(RecordedRequest request, Identifier
identifier)
+ private MockResponse tableHandle(RecordedRequest request, Identifier
identifier)
throws Exception {
RESTResponse response;
if (tableMetadataStore.containsKey(identifier.getFullName())) {
@@ -666,7 +682,7 @@ public class RESTCatalogServer {
}
}
- private MockResponse renameTableApiHandler(RecordedRequest request) throws
Exception {
+ private MockResponse renameTableHandle(RecordedRequest request) throws
Exception {
RenameTableRequest requestBody =
OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
RenameTableRequest.class);
Identifier fromTable = requestBody.getSource();
@@ -684,7 +700,7 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(200);
}
- private MockResponse partitionsApiHandler(RecordedRequest request,
Identifier tableIdentifier)
+ private MockResponse partitionsApiHandle(RecordedRequest request,
Identifier tableIdentifier)
throws Exception {
RESTResponse response;
switch (request.getMethod()) {
@@ -708,7 +724,7 @@ public class RESTCatalogServer {
}
}
- private MockResponse viewsApiHandler(RecordedRequest request, String
databaseName)
+ private MockResponse viewsHandle(RecordedRequest request, String
databaseName)
throws Exception {
RESTResponse response;
switch (request.getMethod()) {
@@ -747,7 +763,7 @@ public class RESTCatalogServer {
}
}
- private MockResponse viewApiHandler(RecordedRequest request, Identifier
identifier)
+ private MockResponse viewHandle(RecordedRequest request, Identifier
identifier)
throws Exception {
RESTResponse response;
if (viewStore.containsKey(identifier.getFullName())) {
@@ -776,7 +792,7 @@ public class RESTCatalogServer {
throw new Catalog.ViewNotExistException(identifier);
}
- private MockResponse renameViewApiHandler(RecordedRequest request) throws
Exception {
+ private MockResponse renameViewHandle(RecordedRequest request) throws
Exception {
RenameTableRequest requestBody =
OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
RenameTableRequest.class);
Identifier fromView = requestBody.getSource();
@@ -830,7 +846,7 @@ public class RESTCatalogServer {
private boolean commitSnapshot(
Identifier identifier, Snapshot snapshot, List<Partition>
statistics)
throws Catalog.TableNotExistException {
- FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ FileStoreTable table = getFileTable(identifier);
RenamingSnapshotCommit commit =
new RenamingSnapshotCommit(table.snapshotManager(),
Lock.empty());
String branchName = identifier.getBranchName();
@@ -923,7 +939,9 @@ public class RESTCatalogServer {
Identifier identifier, long schemaId, Schema schema, String uuid,
boolean isExternal) {
Map<String, String> options = new HashMap<>(schema.options());
Path path = catalog.getTableLocation(identifier);
- options.put(PATH.key(), path.toString());
+ String restPath =
+ path.toString().replaceFirst(LocalFileIOLoader.SCHEME,
RESTFileIOTestLoader.SCHEME);
+ options.put(PATH.key(), restPath);
TableSchema tableSchema =
new TableSchema(
schemaId,
@@ -945,4 +963,27 @@ public class RESTCatalogServer {
// todo: need update
return new Partition(spec, 123, 456, 789, 123);
}
+
+ private FileStoreTable getFileTable(Identifier identifier)
+ throws Catalog.TableNotExistException {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ TableSchema schema = tableMetadata.schema();
+ CatalogEnvironment catalogEnv =
+ new CatalogEnvironment(
+ identifier,
+ tableMetadata.uuid(),
+ catalog.catalogLoader(),
+ catalog.lockFactory().orElse(null),
+ catalog.lockContext().orElse(null),
+ catalog instanceof SupportsSnapshots,
+ catalog instanceof SupportsBranches);
+ Path path = new Path(schema.options().get(PATH.key()));
+ FileIO dataFileIO = catalog.fileIO();
+ FileStoreTable table =
+ FileStoreTableFactory.create(dataFileIO, path, schema,
catalogEnv);
+ return table;
+ }
+ throw new Catalog.TableNotExistException(identifier);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 92c6db658d..dc9bf62743 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.rest.auth.AuthProviderEnum;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
+import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -53,6 +54,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -66,20 +69,38 @@ import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMill
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** Test for REST Catalog. */
class RESTCatalogTest extends CatalogTestBase {
private RESTCatalogServer restCatalogServer;
private String initToken = "init_token";
+ private String serverDefineHeaderName = "test-header";
+ private String serverDefineHeaderValue = "test-value";
+ private ConfigResponse config;
+ private Options options = new Options();
+ private String dataPath;
@BeforeEach
@Override
public void setUp() throws Exception {
super.setUp();
- restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+ dataPath = warehouse;
+ String restWarehouse = UUID.randomUUID().toString();
+ this.config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ "header." + serverDefineHeaderName,
+ serverDefineHeaderValue,
+ CatalogOptions.WAREHOUSE.key(),
+ restWarehouse),
+ ImmutableMap.of());
+ restCatalogServer = new RESTCatalogServer(dataPath, initToken,
this.config, restWarehouse);
restCatalogServer.start();
- Options options = new Options();
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
@@ -113,7 +134,7 @@ class RESTCatalogTest extends CatalogTestBase {
Map<String, String> headers = restCatalog.headers(restAuthParameter);
assertEquals(
headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY),
"Bearer init_token");
- assertEquals(headers.get("test-header"), "test-value");
+ assertEquals(headers.get(serverDefineHeaderName),
serverDefineHeaderValue);
}
@Test
@@ -150,8 +171,7 @@ class RESTCatalogTest extends CatalogTestBase {
RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
RESTToken fileDataToken = fileIO.validToken();
- RESTToken serverDataToken =
-
restCatalogServer.dataTokenStore.get(identifier.getFullName());
+ RESTToken serverDataToken =
restCatalogServer.getDataToken(identifier);
assertEquals(serverDataToken, fileDataToken);
}
}
@@ -183,11 +203,7 @@ class RESTCatalogTest extends CatalogTestBase {
@Test
void testSnapshotFromREST() throws Exception {
- Options options = new Options();
- options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
- options.set(RESTCatalogOptions.TOKEN, initToken);
- options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
- RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
+ RESTCatalog catalog = (RESTCatalog) this.catalog;
Identifier hasSnapshotTableIdentifier = Identifier.create("test_db_a",
"my_snapshot_table");
createTable(hasSnapshotTableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
long id = 10086;
@@ -205,41 +221,60 @@ class RESTCatalogTest extends CatalogTestBase {
}
@Test
- public void testBatchRecordsWrite() throws Exception {
+ public void testDataTokenExpired() throws Exception {
+ this.catalog = initDataTokenCatalog();
+ Identifier identifier =
+ Identifier.create("test_data_token",
"table_for_expired_date_token");
+ createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+ RESTToken expiredDataToken =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId", "akId-expire", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() - 100_000);
+ restCatalogServer.setDataToken(identifier, expiredDataToken);
+ FileStoreTable tableTestWrite = (FileStoreTable)
catalog.getTable(identifier);
+ List<Integer> data = Lists.newArrayList(12);
+ Exception exception =
+ assertThrows(UncheckedIOException.class, () ->
batchWrite(tableTestWrite, data));
+ assertEquals(RESTTestFileIO.TOKEN_EXPIRED_MSG,
exception.getCause().getMessage());
+ RESTToken dataToken =
+ new RESTToken(
+ ImmutableMap.of("akId", "akId", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() + 100_000);
+ restCatalogServer.setDataToken(identifier, dataToken);
+ batchWrite(tableTestWrite, data);
+ List<String> actual = batchRead(tableTestWrite);
+ assertThat(actual).containsExactlyInAnyOrder("+I[12]");
+ }
+
+ @Test
+ public void testDataTokenUnExistInServer() throws Exception {
+ this.catalog = initDataTokenCatalog();
+ Identifier identifier =
+ Identifier.create("test_data_token",
"table_for_un_exist_date_token");
+ createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+ FileStoreTable tableTestWrite = (FileStoreTable)
catalog.getTable(identifier);
+ RESTTokenFileIO restTokenFileIO = (RESTTokenFileIO)
tableTestWrite.fileIO();
+ List<Integer> data = Lists.newArrayList(12);
+ // as RESTTokenFileIO is lazy so we need to call isObjectStore() to
init fileIO
+ restTokenFileIO.isObjectStore();
+ restCatalogServer.removeDataToken(identifier);
+ Exception exception =
+ assertThrows(UncheckedIOException.class, () ->
batchWrite(tableTestWrite, data));
+ assertEquals(RESTTestFileIO.TOKEN_UN_EXIST_MSG,
exception.getCause().getMessage());
+ }
+ @Test
+ public void testBatchRecordsWrite() throws Exception {
Identifier tableIdentifier = Identifier.create("my_db", "my_table");
createTable(tableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
FileStoreTable tableTestWrite = (FileStoreTable)
catalog.getTable(tableIdentifier);
-
// write
- BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
- BatchTableWrite write = writeBuilder.newWrite();
- GenericRow record1 = GenericRow.of(12);
- GenericRow record2 = GenericRow.of(5);
- GenericRow record3 = GenericRow.of(18);
- write.write(record1);
- write.write(record2);
- write.write(record3);
- List<CommitMessage> messages = write.prepareCommit();
- BatchTableCommit commit = writeBuilder.newCommit();
- commit.commit(messages);
- write.close();
- commit.close();
+ batchWrite(tableTestWrite, Lists.newArrayList(12, 5, 18));
// read
- ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
- List<Split> splits = readBuilder.newScan().plan().splits();
- TableRead read = readBuilder.newRead();
- RecordReader<InternalRow> reader = read.createReader(splits);
- List<String> actual = new ArrayList<>();
- reader.forEachRemaining(
- row -> {
- String rowStr =
- String.format("%s[%d]",
row.getRowKind().shortString(), row.getInt(0));
- actual.add(rowStr);
- });
-
- assertThat(actual).containsExactlyInAnyOrder("+I[5]", "+I[12]",
"+I[18]");
+ List<String> result = batchRead(tableTestWrite);
+ assertThat(result).containsExactlyInAnyOrder("+I[5]", "+I[12]",
"+I[18]");
}
@Test
@@ -299,11 +334,39 @@ class RESTCatalogTest extends CatalogTestBase {
}
private Catalog initDataTokenCatalog() {
- Options options = new Options();
- options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
- options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
- options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
+ options.set(
+ RESTTestFileIO.DATA_PATH_CONF_KEY,
+ dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
return new RESTCatalog(CatalogContext.create(options));
}
+
+ private void batchWrite(FileStoreTable tableTestWrite, List<Integer> data)
throws Exception {
+ BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ for (Integer i : data) {
+ GenericRow record = GenericRow.of(i);
+ write.write(record);
+ }
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ write.close();
+ commit.close();
+ }
+
+ private List<String> batchRead(FileStoreTable tableTestWrite) throws
IOException {
+ ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+ RecordReader<InternalRow> reader = read.createReader(splits);
+ List<String> result = new ArrayList<>();
+ reader.forEachRemaining(
+ row -> {
+ String rowStr =
+ String.format("%s[%d]",
row.getRowKind().shortString(), row.getInt(0));
+ result.add(rowStr);
+ });
+ return result;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileIOTestLoader.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileIOTestLoader.java
new file mode 100644
index 0000000000..770f9afece
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileIOTestLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+
+/** RESTFileIOTestLoader for testing. */
+public class RESTFileIOTestLoader implements FileIOLoader {
+
+ public static final String SCHEME = "rest-test-file-io";
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ @Override
+ public FileIO load(Path path) {
+ return new RESTTestFileIO();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java
new file mode 100644
index 0000000000..58d331faa6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import java.io.IOException;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.DATA_TOKEN_ENABLED;
+
+/**
+ * A {@link org.apache.paimon.fs.FileIO} implementation for testing.
+ *
+ * <p>It is used to test the RESTFileIO.
+ */
+public class RESTTestFileIO extends LocalFileIO {
+
+ public static final String TOKEN_UN_EXIST_MSG = "token is null";
+ public static final String TOKEN_EXPIRED_MSG = "token is expired";
+ public static final String DATA_PATH_CONF_KEY = "rest.test.data-path";
+ private Options options;
+
+ @Override
+ public boolean isObjectStore() {
+ return false;
+ }
+
+ @Override
+ public void configure(CatalogContext context) {
+ options = context.options();
+ super.configure(context);
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ checkDataToken(path);
+ return super.newInputStream(path);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ checkDataToken(path);
+ return super.newOutputStream(path, overwrite);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ checkDataToken(path);
+ return super.getFileStatus(path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ checkDataToken(path);
+ return super.listStatus(path);
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ checkDataToken(path);
+ return super.exists(path);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ checkDataToken(path);
+ return super.delete(path, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ checkDataToken(path);
+ return super.mkdirs(path);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ checkDataToken(src);
+ return super.rename(src, dst);
+ }
+
+ private void checkDataToken(Path path) throws IOException {
+ boolean isDataTokenEnabled =
options.getOptional(DATA_TOKEN_ENABLED).orElse(false);
+ if (isDataTokenEnabled) {
+ RESTToken token = getToken(path);
+ if (token == null) {
+ throw new IOException(TOKEN_UN_EXIST_MSG);
+ } else if (token.expireAtMillis() < System.currentTimeMillis()) {
+ throw new IOException(TOKEN_EXPIRED_MSG);
+ }
+ }
+ }
+
+ private RESTToken getToken(Path path) {
+ String dataPath = options.get(DATA_PATH_CONF_KEY);
+ String basePath = dataPath.replaceAll(RESTFileIOTestLoader.SCHEME +
"://", "");
+ String filePath = path.toString().split(":")[1].replaceAll(basePath,
"");
+ String[] paths = filePath.split("/");
+ String database = paths[0].replaceAll("\\.db", "");
+ String table = paths[1];
+ return DataTokenStore.getDataToken(
+ options.get(CatalogOptions.WAREHOUSE.key()),
+ Identifier.create(database, table).getFullName());
+ }
+}
diff --git
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
index e15ffca931..862d2d125a 100644
---
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
+++
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -16,3 +16,4 @@
org.apache.paimon.utils.FailingFileIO$Loader
org.apache.paimon.utils.TraceableFileIO$Loader
org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataTest$TestFileIOLoader
+org.apache.paimon.rest.RESTFileIOTestLoader
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index cf66ee425b..5b4f5eae21 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -18,9 +18,18 @@
package org.apache.paimon.flink;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTFileIOTestLoader;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.RESTToken;
import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterEach;
@@ -32,8 +41,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** ITCase for REST catalog. */
class RESTCatalogITCase extends CatalogITCaseBase {
@@ -43,6 +54,7 @@ class RESTCatalogITCase extends CatalogITCaseBase {
private RESTCatalogServer restCatalogServer;
private String serverUrl;
+ private String dataPath;
private String warehouse;
@TempDir java.nio.file.Path tempFile;
@@ -50,8 +62,19 @@ class RESTCatalogITCase extends CatalogITCaseBase {
@Override
public void before() throws IOException {
String initToken = "init_token";
- warehouse = tempFile.toUri().toString();
- restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+ dataPath = tempFile.toUri().toString();
+ warehouse = UUID.randomUUID().toString();
+ ConfigResponse config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ RESTCatalogOptions.DATA_TOKEN_ENABLED.key(),
+ "true",
+ CatalogOptions.WAREHOUSE.key(),
+ warehouse),
+ ImmutableMap.of());
+ restCatalogServer = new RESTCatalogServer(dataPath, initToken, config,
warehouse);
restCatalogServer.start();
serverUrl = restCatalogServer.getUrl();
super.before();
@@ -94,19 +117,38 @@ class RESTCatalogITCase extends CatalogITCaseBase {
DATABASE_NAME, TABLE_NAME));
}
- @Override
- protected Map<String, String> catalogOptions() {
- String initToken = "init_token";
- Map<String, String> options = new HashMap<>();
- options.put("metastore", "rest");
- options.put(RESTCatalogOptions.URI.key(), serverUrl);
- options.put(RESTCatalogOptions.TOKEN.key(), initToken);
- options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.BEAR.identifier());
- return options;
+ @Test
+ public void testWriteAndRead() {
+ batchSql(
+ String.format(
+ "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
+ DATABASE_NAME, TABLE_NAME));
+ assertThat(batchSql(String.format("SELECT * FROM %s.%s",
DATABASE_NAME, TABLE_NAME)))
+ .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2",
22.0D));
}
@Test
- public void testWriteAndRead() {
+ public void testExpiredDataToken() {
+ Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
+ RESTToken expiredDataToken =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId", "akId-expire", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() - 100_000);
+ restCatalogServer.setDataToken(identifier, expiredDataToken);
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ batchSql(
+ String.format(
+ "INSERT INTO %s.%s VALUES ('1', 11),
('2', 22)",
+ DATABASE_NAME, TABLE_NAME)));
+ // update token and retry
+ RESTToken dataToken =
+ new RESTToken(
+ ImmutableMap.of("akId", "akId", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() + 100_000);
+ restCatalogServer.setDataToken(identifier, dataToken);
batchSql(
String.format(
"INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
@@ -115,9 +157,25 @@ class RESTCatalogITCase extends CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2",
22.0D));
}
+ @Override
+ protected Map<String, String> catalogOptions() {
+ String initToken = "init_token";
+ Map<String, String> options = new HashMap<>();
+ options.put("metastore", "rest");
+ options.put(CatalogOptions.WAREHOUSE.key(), warehouse);
+ options.put(RESTCatalogOptions.URI.key(), serverUrl);
+ options.put(RESTCatalogOptions.TOKEN.key(), initToken);
+ options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.BEAR.identifier());
+ options.put(RESTCatalogOptions.DATA_TOKEN_ENABLED.key(), "true");
+ options.put(
+ RESTTestFileIO.DATA_PATH_CONF_KEY,
+ dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
+ return options;
+ }
+
@Override
protected String getTempDirPath() {
- return this.warehouse;
+ return this.dataPath;
}
@Override
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index cd2acdbe86..d15e14c7d5 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -18,8 +18,13 @@
package org.apache.paimon.spark;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTCatalogServer;
import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
@@ -28,6 +33,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,14 +42,24 @@ public class SparkCatalogWithRestTest {
private RESTCatalogServer restCatalogServer;
private String serverUrl;
+ private String dataPath;
private String warehouse;
- private String initToken = "init_token";
@TempDir java.nio.file.Path tempFile;
+ private String initToken = "init_token";
@BeforeEach
public void before() throws IOException {
- warehouse = tempFile.toUri().toString();
- restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+ dataPath = tempFile.toUri().toString();
+ warehouse = UUID.randomUUID().toString();
+ ConfigResponse config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ CatalogOptions.WAREHOUSE.key(),
+ warehouse),
+ ImmutableMap.of());
+ restCatalogServer = new RESTCatalogServer(dataPath, initToken, config,
warehouse);
restCatalogServer.start();
serverUrl = restCatalogServer.getUrl();
}
@@ -61,6 +77,7 @@ public class SparkCatalogWithRestTest {
.config("spark.sql.catalog.paimon.metastore", "rest")
.config("spark.sql.catalog.paimon.uri", serverUrl)
.config("spark.sql.catalog.paimon.token", initToken)
+ .config("spark.sql.catalog.paimon.warehouse",
warehouse)
.config(
"spark.sql.catalog.paimon.token.provider",
AuthProviderEnum.BEAR.identifier())