This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 48dce34f35 [core] RESTCatalog: support data token IT test (#5184)
48dce34f35 is described below

commit 48dce34f357f84de5190c7529bfc26fb04166795
Author: jerry <[email protected]>
AuthorDate: Tue Mar 4 16:11:36 2025 +0800

    [core] RESTCatalog: support data token IT test (#5184)
---
 .../org/apache/paimon/rest/DataTokenProvider.java  |  61 --------
 .../org/apache/paimon/rest/DataTokenStore.java     |  55 ++++++++
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 153 +++++++++++++--------
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 147 ++++++++++++++------
 .../apache/paimon/rest/RESTFileIOTestLoader.java   |  41 ++++++
 .../org/apache/paimon/rest/RESTTestFileIO.java     | 129 +++++++++++++++++
 .../services/org.apache.paimon.fs.FileIOLoader     |   1 +
 .../org/apache/paimon/flink/RESTCatalogITCase.java |  84 +++++++++--
 .../paimon/spark/SparkCatalogWithRestTest.java     |  23 +++-
 9 files changed, 519 insertions(+), 175 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
deleted file mode 100644
index 081ada0fba..0000000000
--- a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-/** Refresh data token in test mode. */
-public class DataTokenProvider {
-
-    private Map<String, String> token;
-    private long expiresAtMillis;
-
-    public DataTokenProvider(Map<String, String> token, long expiresAtMillis) {
-        this.token = token;
-        this.expiresAtMillis = expiresAtMillis;
-    }
-
-    public void setExpiresAtMillis(long expiresAtMillis) {
-        this.expiresAtMillis = expiresAtMillis;
-    }
-
-    public Map<String, String> getToken() {
-        return token;
-    }
-
-    public long getExpiresAtMillis() {
-        return expiresAtMillis;
-    }
-
-    public void setToken(Map<String, String> token) {
-        this.token = token;
-    }
-
-    public void refresh() {
-        this.token =
-                ImmutableMap.of(
-                        "ak",
-                        "ak-" + System.currentTimeMillis(),
-                        "sk",
-                        "sk-" + System.currentTimeMillis());
-        this.expiresAtMillis = System.currentTimeMillis();
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java
new file mode 100644
index 0000000000..87eb470028
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** DataTokenStore is used to store data token. */
+public class DataTokenStore {
+
+    // as warehouse means one catalog instance, so we use warehouse as key to 
store data token
+    private static final Map<String, Map<String, RESTToken>> 
warehouse2DataTokenStore =
+            new HashMap<>();
+
+    public static void putDataToken(String warehouse, String tableFullName, 
RESTToken dataToken) {
+        Map<String, RESTToken> dataTokenStore = 
warehouse2DataTokenStore.get(warehouse);
+        if (dataTokenStore == null) {
+            dataTokenStore = new HashMap<>();
+            warehouse2DataTokenStore.put(warehouse, dataTokenStore);
+        }
+        dataTokenStore.put(tableFullName, dataToken);
+    }
+
+    public static RESTToken getDataToken(String warehouse, String 
tableFullName) {
+        Map<String, RESTToken> dataTokenStore = 
warehouse2DataTokenStore.get(warehouse);
+        if (dataTokenStore == null) {
+            return null;
+        }
+        return dataTokenStore.get(tableFullName);
+    }
+
+    public static void removeDataToken(String warehouse, String tableFullName) 
{
+        Map<String, RESTToken> dataTokenStore = 
warehouse2DataTokenStore.get(warehouse);
+        if (dataTokenStore != null && 
dataTokenStore.containsKey(tableFullName)) {
+            dataTokenStore.remove(tableFullName);
+            warehouse2DataTokenStore.put(warehouse, dataTokenStore);
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 30d7b0a598..4f96d0727d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -27,9 +27,13 @@ import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
+import org.apache.paimon.catalog.SupportsBranches;
+import org.apache.paimon.catalog.SupportsSnapshots;
 import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.fs.local.LocalFileIOLoader;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -50,6 +54,7 @@ import 
org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.CommitTableResponse;
+import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.ErrorResponseResourceType;
@@ -66,7 +71,9 @@ import org.apache.paimon.rest.responses.ListViewsResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
@@ -101,8 +108,8 @@ import static 
org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
 /** Mock REST server for testing. */
 public class RESTCatalogServer {
 
-    private static final String PREFIX = "paimon";
-    private static final String DATABASE_URI = 
String.format("/v1/%s/databases", PREFIX);
+    private final String prefix;
+    private final String databaseUri;
 
     private final FileSystemCatalog catalog;
     private final Dispatcher dispatcher;
@@ -114,23 +121,34 @@ public class RESTCatalogServer {
     public final Map<String, List<Partition>> tablePartitionsStore = new 
HashMap<>();
     public final Map<String, View> viewStore = new HashMap<>();
     public final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
-    Map<String, RESTToken> dataTokenStore = new HashMap<>();
-
-    public RESTCatalogServer(String warehouse, String initToken) {
+    public final ConfigResponse configResponse;
+    public final String warehouse;
+
+    private ResourcePaths resourcePaths;
+
+    public RESTCatalogServer(
+            String dataPath, String initToken, ConfigResponse config, String 
warehouse) {
+        this.warehouse = warehouse;
+        this.configResponse = config;
+        this.prefix =
+                
this.configResponse.getDefaults().get(RESTCatalogInternalOptions.PREFIX.key());
+        ResourcePaths resourcePaths = new ResourcePaths(prefix);
+        this.databaseUri = resourcePaths.databases();
         authToken = initToken;
         Options conf = new Options();
-        conf.setString("warehouse", warehouse);
+        this.configResponse.getDefaults().forEach((k, v) -> conf.setString(k, 
v));
+        conf.setString(CatalogOptions.WAREHOUSE.key(), dataPath);
         CatalogContext context = CatalogContext.create(conf);
-        Path warehousePath = new Path(warehouse);
+        Path warehousePath = new Path(dataPath);
         FileIO fileIO;
         try {
-            fileIO = FileIO.get(warehousePath, context);
+            fileIO = new LocalFileIO();
             fileIO.checkOrMkdirs(warehousePath);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
         this.catalog = new FileSystemCatalog(fileIO, warehousePath, 
context.options());
-        this.dispatcher = initDispatcher(warehouse, authToken);
+        this.dispatcher = initDispatcher(authToken);
         MockWebServer mockWebServer = new MockWebServer();
         mockWebServer.setDispatcher(dispatcher);
         server = mockWebServer;
@@ -153,10 +171,18 @@ public class RESTCatalogServer {
     }
 
     public void setDataToken(Identifier identifier, RESTToken token) {
-        dataTokenStore.put(identifier.getFullName(), token);
+        DataTokenStore.putDataToken(warehouse, identifier.getFullName(), 
token);
+    }
+
+    public void removeDataToken(Identifier identifier) {
+        DataTokenStore.removeDataToken(warehouse, identifier.getFullName());
     }
 
-    public Dispatcher initDispatcher(String warehouse, String authToken) {
+    public RESTToken getDataToken(Identifier identifier) {
+        return DataTokenStore.getDataToken(warehouse, 
identifier.getFullName());
+    }
+
+    public Dispatcher initDispatcher(String authToken) {
         return new Dispatcher() {
             @Override
             public MockResponse dispatch(RecordedRequest request) {
@@ -167,23 +193,14 @@ public class RESTCatalogServer {
                     if (!("Bearer " + authToken).equals(token)) {
                         return new MockResponse().setResponseCode(401);
                     }
-                    if (request.getPath().startsWith("/v1/config")) {
-                        String body =
-                                String.format(
-                                        "{\"defaults\": {\"%s\": \"%s\", 
\"%s\": \"%s\", \"%s\": \"%s\"}}",
-                                        
RESTCatalogInternalOptions.PREFIX.key(),
-                                        PREFIX,
-                                        CatalogOptions.WAREHOUSE.key(),
-                                        warehouse,
-                                        "header.test-header",
-                                        "test-value");
-                        return new 
MockResponse().setResponseCode(200).setBody(body);
-                    } else if (DATABASE_URI.equals(request.getPath())) {
+                    if 
(request.getPath().equals(resourcePaths.config(warehouse))) {
+                        return mockResponse(configResponse, 200);
+                    } else if (databaseUri.equals(request.getPath())) {
                         return databasesApiHandler(request);
-                    } else if (request.getPath().startsWith(DATABASE_URI)) {
+                    } else if (request.getPath().startsWith(databaseUri)) {
                         String[] resources =
                                 request.getPath()
-                                        .substring((DATABASE_URI + 
"/").length())
+                                        .substring((databaseUri + 
"/").length())
                                         .split("/");
                         String databaseName = resources[0];
                         if (!databaseStore.containsKey(databaseName)) {
@@ -275,7 +292,7 @@ public class RESTCatalogServer {
                                     identifier, 
markDonePartitionsRequest.getPartitionSpecs());
                             return new MockResponse().setResponseCode(200);
                         } else if (isPartitions) {
-                            return partitionsApiHandler(request, identifier);
+                            return partitionsApiHandle(request, identifier);
                         } else if (isBranches) {
                             FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
                             BranchManager branchManager = 
table.branchManager();
@@ -312,25 +329,25 @@ public class RESTCatalogServer {
                                     return new 
MockResponse().setResponseCode(404);
                             }
                         } else if (isTableToken) {
-                            return handleDataToken(identifier);
+                            return getDataTokenHandle(identifier);
                         } else if (isTableSnapshot) {
-                            return handleSnapshot(identifier);
+                            return snapshotHandle(identifier);
                         } else if (isTableRename) {
-                            return renameTableApiHandler(request);
+                            return renameTableHandle(request);
                         } else if (isTableCommit) {
-                            return commitTableApiHandler(request);
+                            return commitTableHandle(request);
                         } else if (isTable) {
-                            return tableApiHandler(request, identifier);
+                            return tableHandle(request, identifier);
                         } else if (isTables) {
-                            return tablesApiHandler(request, databaseName);
+                            return tablesHandle(request, databaseName);
                         } else if (isViews) {
-                            return viewsApiHandler(request, databaseName);
+                            return viewsHandle(request, databaseName);
                         } else if (isViewRename) {
-                            return renameViewApiHandler(request);
+                            return renameViewHandle(request);
                         } else if (isView) {
-                            return viewApiHandler(request, identifier);
+                            return viewHandle(request, identifier);
                         } else {
-                            return databaseApiHandler(request, databaseName);
+                            return databaseHandle(request, databaseName);
                         }
                     }
                     return new MockResponse().setResponseCode(404);
@@ -421,12 +438,10 @@ public class RESTCatalogServer {
         };
     }
 
-    private MockResponse handleDataToken(Identifier tableIdentifier) throws 
Exception {
-        RESTToken dataToken;
-        if (dataTokenStore.containsKey(tableIdentifier.getFullName())) {
-            dataToken = dataTokenStore.get(tableIdentifier.getFullName());
-        } else {
-            long currentTimeMillis = System.currentTimeMillis();
+    private MockResponse getDataTokenHandle(Identifier tableIdentifier) throws 
Exception {
+        RESTToken dataToken = getDataToken(tableIdentifier);
+        if (dataToken == null) {
+            long currentTimeMillis = System.currentTimeMillis() + 60_000;
             dataToken =
                     new RESTToken(
                             ImmutableMap.of(
@@ -435,7 +450,7 @@ public class RESTCatalogServer {
                                     "akSecret",
                                     "akSecret" + currentTimeMillis),
                             currentTimeMillis);
-            dataTokenStore.put(tableIdentifier.getFullName(), dataToken);
+            DataTokenStore.putDataToken(warehouse, 
tableIdentifier.getFullName(), dataToken);
         }
         GetTableTokenResponse getTableTokenResponse =
                 new GetTableTokenResponse(dataToken.token(), 
dataToken.expireAtMillis());
@@ -444,7 +459,7 @@ public class RESTCatalogServer {
                 
.setBody(OBJECT_MAPPER.writeValueAsString(getTableTokenResponse));
     }
 
-    private MockResponse handleSnapshot(Identifier identifier) throws 
Exception {
+    private MockResponse snapshotHandle(Identifier identifier) throws 
Exception {
         RESTResponse response;
         Optional<Snapshot> snapshotOptional =
                 
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
@@ -480,10 +495,11 @@ public class RESTCatalogServer {
                         new ErrorResponse(ErrorResponseResourceType.TABLE, 
null, "", 404), 404));
     }
 
-    private MockResponse commitTableApiHandler(RecordedRequest request) throws 
Exception {
+    private MockResponse commitTableHandle(RecordedRequest request) throws 
Exception {
         CommitTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
CommitTableRequest.class);
-        FileStoreTable table = (FileStoreTable) 
catalog.getTable(requestBody.getIdentifier());
+        Identifier identifier = requestBody.getIdentifier();
+        FileStoreTable table = getFileTable(identifier);
         RenamingSnapshotCommit commit =
                 new RenamingSnapshotCommit(table.snapshotManager(), 
Lock.empty());
         String branchName = requestBody.getIdentifier().getBranchName();
@@ -519,7 +535,7 @@ public class RESTCatalogServer {
         }
     }
 
-    private MockResponse databaseApiHandler(RecordedRequest request, String 
databaseName)
+    private MockResponse databaseHandle(RecordedRequest request, String 
databaseName)
             throws Exception {
         RESTResponse response;
         Database database;
@@ -580,7 +596,7 @@ public class RESTCatalogServer {
         return new MockResponse().setResponseCode(404);
     }
 
-    private MockResponse tablesApiHandler(RecordedRequest request, String 
databaseName)
+    private MockResponse tablesHandle(RecordedRequest request, String 
databaseName)
             throws Exception {
         RESTResponse response;
         if (databaseStore.containsKey(databaseName)) {
@@ -629,7 +645,7 @@ public class RESTCatalogServer {
         return Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE;
     }
 
-    private MockResponse tableApiHandler(RecordedRequest request, Identifier 
identifier)
+    private MockResponse tableHandle(RecordedRequest request, Identifier 
identifier)
             throws Exception {
         RESTResponse response;
         if (tableMetadataStore.containsKey(identifier.getFullName())) {
@@ -666,7 +682,7 @@ public class RESTCatalogServer {
         }
     }
 
-    private MockResponse renameTableApiHandler(RecordedRequest request) throws 
Exception {
+    private MockResponse renameTableHandle(RecordedRequest request) throws 
Exception {
         RenameTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
RenameTableRequest.class);
         Identifier fromTable = requestBody.getSource();
@@ -684,7 +700,7 @@ public class RESTCatalogServer {
         return new MockResponse().setResponseCode(200);
     }
 
-    private MockResponse partitionsApiHandler(RecordedRequest request, 
Identifier tableIdentifier)
+    private MockResponse partitionsApiHandle(RecordedRequest request, 
Identifier tableIdentifier)
             throws Exception {
         RESTResponse response;
         switch (request.getMethod()) {
@@ -708,7 +724,7 @@ public class RESTCatalogServer {
         }
     }
 
-    private MockResponse viewsApiHandler(RecordedRequest request, String 
databaseName)
+    private MockResponse viewsHandle(RecordedRequest request, String 
databaseName)
             throws Exception {
         RESTResponse response;
         switch (request.getMethod()) {
@@ -747,7 +763,7 @@ public class RESTCatalogServer {
         }
     }
 
-    private MockResponse viewApiHandler(RecordedRequest request, Identifier 
identifier)
+    private MockResponse viewHandle(RecordedRequest request, Identifier 
identifier)
             throws Exception {
         RESTResponse response;
         if (viewStore.containsKey(identifier.getFullName())) {
@@ -776,7 +792,7 @@ public class RESTCatalogServer {
         throw new Catalog.ViewNotExistException(identifier);
     }
 
-    private MockResponse renameViewApiHandler(RecordedRequest request) throws 
Exception {
+    private MockResponse renameViewHandle(RecordedRequest request) throws 
Exception {
         RenameTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
RenameTableRequest.class);
         Identifier fromView = requestBody.getSource();
@@ -830,7 +846,7 @@ public class RESTCatalogServer {
     private boolean commitSnapshot(
             Identifier identifier, Snapshot snapshot, List<Partition> 
statistics)
             throws Catalog.TableNotExistException {
-        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        FileStoreTable table = getFileTable(identifier);
         RenamingSnapshotCommit commit =
                 new RenamingSnapshotCommit(table.snapshotManager(), 
Lock.empty());
         String branchName = identifier.getBranchName();
@@ -923,7 +939,9 @@ public class RESTCatalogServer {
             Identifier identifier, long schemaId, Schema schema, String uuid, 
boolean isExternal) {
         Map<String, String> options = new HashMap<>(schema.options());
         Path path = catalog.getTableLocation(identifier);
-        options.put(PATH.key(), path.toString());
+        String restPath =
+                path.toString().replaceFirst(LocalFileIOLoader.SCHEME, 
RESTFileIOTestLoader.SCHEME);
+        options.put(PATH.key(), restPath);
         TableSchema tableSchema =
                 new TableSchema(
                         schemaId,
@@ -945,4 +963,27 @@ public class RESTCatalogServer {
         // todo: need update
         return new Partition(spec, 123, 456, 789, 123);
     }
+
+    private FileStoreTable getFileTable(Identifier identifier)
+            throws Catalog.TableNotExistException {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+            TableSchema schema = tableMetadata.schema();
+            CatalogEnvironment catalogEnv =
+                    new CatalogEnvironment(
+                            identifier,
+                            tableMetadata.uuid(),
+                            catalog.catalogLoader(),
+                            catalog.lockFactory().orElse(null),
+                            catalog.lockContext().orElse(null),
+                            catalog instanceof SupportsSnapshots,
+                            catalog instanceof SupportsBranches);
+            Path path = new Path(schema.options().get(PATH.key()));
+            FileIO dataFileIO = catalog.fileIO();
+            FileStoreTable table =
+                    FileStoreTableFactory.create(dataFileIO, path, schema, 
catalogEnv);
+            return table;
+        }
+        throw new Catalog.TableNotExistException(identifier);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 92c6db658d..dc9bf62743 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.rest.auth.AuthProviderEnum;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.NotAuthorizedException;
+import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -53,6 +54,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,20 +69,38 @@ import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMill
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Test for REST Catalog. */
 class RESTCatalogTest extends CatalogTestBase {
 
     private RESTCatalogServer restCatalogServer;
     private String initToken = "init_token";
+    private String serverDefineHeaderName = "test-header";
+    private String serverDefineHeaderValue = "test-value";
+    private ConfigResponse config;
+    private Options options = new Options();
+    private String dataPath;
 
     @BeforeEach
     @Override
     public void setUp() throws Exception {
         super.setUp();
-        restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+        dataPath = warehouse;
+        String restWarehouse = UUID.randomUUID().toString();
+        this.config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                "header." + serverDefineHeaderName,
+                                serverDefineHeaderValue,
+                                CatalogOptions.WAREHOUSE.key(),
+                                restWarehouse),
+                        ImmutableMap.of());
+        restCatalogServer = new RESTCatalogServer(dataPath, initToken, 
this.config, restWarehouse);
         restCatalogServer.start();
-        Options options = new Options();
+        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
@@ -113,7 +134,7 @@ class RESTCatalogTest extends CatalogTestBase {
         Map<String, String> headers = restCatalog.headers(restAuthParameter);
         assertEquals(
                 headers.get(BearTokenAuthProvider.AUTHORIZATION_HEADER_KEY), 
"Bearer init_token");
-        assertEquals(headers.get("test-header"), "test-value");
+        assertEquals(headers.get(serverDefineHeaderName), 
serverDefineHeaderValue);
     }
 
     @Test
@@ -150,8 +171,7 @@ class RESTCatalogTest extends CatalogTestBase {
 
             RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
             RESTToken fileDataToken = fileIO.validToken();
-            RESTToken serverDataToken =
-                    
restCatalogServer.dataTokenStore.get(identifier.getFullName());
+            RESTToken serverDataToken = 
restCatalogServer.getDataToken(identifier);
             assertEquals(serverDataToken, fileDataToken);
         }
     }
@@ -183,11 +203,7 @@ class RESTCatalogTest extends CatalogTestBase {
 
     @Test
     void testSnapshotFromREST() throws Exception {
-        Options options = new Options();
-        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
-        options.set(RESTCatalogOptions.TOKEN, initToken);
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
-        RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
+        RESTCatalog catalog = (RESTCatalog) this.catalog;
         Identifier hasSnapshotTableIdentifier = Identifier.create("test_db_a", 
"my_snapshot_table");
         createTable(hasSnapshotTableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
         long id = 10086;
@@ -205,41 +221,60 @@ class RESTCatalogTest extends CatalogTestBase {
     }
 
     @Test
-    public void testBatchRecordsWrite() throws Exception {
+    public void testDataTokenExpired() throws Exception {
+        this.catalog = initDataTokenCatalog();
+        Identifier identifier =
+                Identifier.create("test_data_token", 
"table_for_expired_date_token");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        RESTToken expiredDataToken =
+                new RESTToken(
+                        ImmutableMap.of(
+                                "akId", "akId-expire", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() - 100_000);
+        restCatalogServer.setDataToken(identifier, expiredDataToken);
+        FileStoreTable tableTestWrite = (FileStoreTable) 
catalog.getTable(identifier);
+        List<Integer> data = Lists.newArrayList(12);
+        Exception exception =
+                assertThrows(UncheckedIOException.class, () -> 
batchWrite(tableTestWrite, data));
+        assertEquals(RESTTestFileIO.TOKEN_EXPIRED_MSG, 
exception.getCause().getMessage());
+        RESTToken dataToken =
+                new RESTToken(
+                        ImmutableMap.of("akId", "akId", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() + 100_000);
+        restCatalogServer.setDataToken(identifier, dataToken);
+        batchWrite(tableTestWrite, data);
+        List<String> actual = batchRead(tableTestWrite);
+        assertThat(actual).containsExactlyInAnyOrder("+I[12]");
+    }
+
+    @Test
+    public void testDataTokenUnExistInServer() throws Exception {
+        this.catalog = initDataTokenCatalog();
+        Identifier identifier =
+                Identifier.create("test_data_token", 
"table_for_un_exist_date_token");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        FileStoreTable tableTestWrite = (FileStoreTable) 
catalog.getTable(identifier);
+        RESTTokenFileIO restTokenFileIO = (RESTTokenFileIO) 
tableTestWrite.fileIO();
+        List<Integer> data = Lists.newArrayList(12);
+        // as RESTTokenFileIO is lazy so we need to call isObjectStore() to 
init fileIO
+        restTokenFileIO.isObjectStore();
+        restCatalogServer.removeDataToken(identifier);
+        Exception exception =
+                assertThrows(UncheckedIOException.class, () -> 
batchWrite(tableTestWrite, data));
+        assertEquals(RESTTestFileIO.TOKEN_UN_EXIST_MSG, 
exception.getCause().getMessage());
+    }
 
+    @Test
+    public void testBatchRecordsWrite() throws Exception {
         Identifier tableIdentifier = Identifier.create("my_db", "my_table");
         createTable(tableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
         FileStoreTable tableTestWrite = (FileStoreTable) 
catalog.getTable(tableIdentifier);
-
         // write
-        BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
-        BatchTableWrite write = writeBuilder.newWrite();
-        GenericRow record1 = GenericRow.of(12);
-        GenericRow record2 = GenericRow.of(5);
-        GenericRow record3 = GenericRow.of(18);
-        write.write(record1);
-        write.write(record2);
-        write.write(record3);
-        List<CommitMessage> messages = write.prepareCommit();
-        BatchTableCommit commit = writeBuilder.newCommit();
-        commit.commit(messages);
-        write.close();
-        commit.close();
+        batchWrite(tableTestWrite, Lists.newArrayList(12, 5, 18));
 
         // read
-        ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
-        List<Split> splits = readBuilder.newScan().plan().splits();
-        TableRead read = readBuilder.newRead();
-        RecordReader<InternalRow> reader = read.createReader(splits);
-        List<String> actual = new ArrayList<>();
-        reader.forEachRemaining(
-                row -> {
-                    String rowStr =
-                            String.format("%s[%d]", 
row.getRowKind().shortString(), row.getInt(0));
-                    actual.add(rowStr);
-                });
-
-        assertThat(actual).containsExactlyInAnyOrder("+I[5]", "+I[12]", 
"+I[18]");
+        List<String> result = batchRead(tableTestWrite);
+        assertThat(result).containsExactlyInAnyOrder("+I[5]", "+I[12]", 
"+I[18]");
     }
 
     @Test
@@ -299,11 +334,39 @@ class RESTCatalogTest extends CatalogTestBase {
     }
 
     private Catalog initDataTokenCatalog() {
-        Options options = new Options();
-        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
-        options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
+        options.set(
+                RESTTestFileIO.DATA_PATH_CONF_KEY,
+                dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
         return new RESTCatalog(CatalogContext.create(options));
     }
+
+    private void batchWrite(FileStoreTable tableTestWrite, List<Integer> data) 
throws Exception {
+        BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        for (Integer i : data) {
+            GenericRow record = GenericRow.of(i);
+            write.write(record);
+        }
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+    }
+
+    private List<String> batchRead(FileStoreTable tableTestWrite) throws 
IOException {
+        ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+        List<String> result = new ArrayList<>();
+        reader.forEachRemaining(
+                row -> {
+                    String rowStr =
+                            String.format("%s[%d]", 
row.getRowKind().shortString(), row.getInt(0));
+                    result.add(rowStr);
+                });
+        return result;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileIOTestLoader.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileIOTestLoader.java
new file mode 100644
index 0000000000..770f9afece
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileIOTestLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+
+/** RESTFileIOTestLoader for testing. */
+public class RESTFileIOTestLoader implements FileIOLoader {
+
+    public static final String SCHEME = "rest-test-file-io";
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String getScheme() {
+        return SCHEME;
+    }
+
+    @Override
+    public FileIO load(Path path) {
+        return new RESTTestFileIO();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java
new file mode 100644
index 0000000000..58d331faa6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import java.io.IOException;
+
+import static org.apache.paimon.rest.RESTCatalogOptions.DATA_TOKEN_ENABLED;
+
+/**
+ * A {@link org.apache.paimon.fs.FileIO} implementation for testing.
+ *
+ * <p>It is used to test the RESTFileIO.
+ */
+public class RESTTestFileIO extends LocalFileIO {
+
+    public static final String TOKEN_UN_EXIST_MSG = "token is null";
+    public static final String TOKEN_EXPIRED_MSG = "token is expired";
+    public static final String DATA_PATH_CONF_KEY = "rest.test.data-path";
+    private Options options;
+
+    @Override
+    public boolean isObjectStore() {
+        return false;
+    }
+
+    @Override
+    public void configure(CatalogContext context) {
+        options = context.options();
+        super.configure(context);
+    }
+
+    @Override
+    public SeekableInputStream newInputStream(Path path) throws IOException {
+        checkDataToken(path);
+        return super.newInputStream(path);
+    }
+
+    @Override
+    public PositionOutputStream newOutputStream(Path path, boolean overwrite) 
throws IOException {
+        checkDataToken(path);
+        return super.newOutputStream(path, overwrite);
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        checkDataToken(path);
+        return super.getFileStatus(path);
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) throws IOException {
+        checkDataToken(path);
+        return super.listStatus(path);
+    }
+
+    @Override
+    public boolean exists(Path path) throws IOException {
+        checkDataToken(path);
+        return super.exists(path);
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        checkDataToken(path);
+        return super.delete(path, recursive);
+    }
+
+    @Override
+    public boolean mkdirs(Path path) throws IOException {
+        checkDataToken(path);
+        return super.mkdirs(path);
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        checkDataToken(src);
+        return super.rename(src, dst);
+    }
+
+    private void checkDataToken(Path path) throws IOException {
+        boolean isDataTokenEnabled = 
options.getOptional(DATA_TOKEN_ENABLED).orElse(false);
+        if (isDataTokenEnabled) {
+            RESTToken token = getToken(path);
+            if (token == null) {
+                throw new IOException(TOKEN_UN_EXIST_MSG);
+            } else if (token.expireAtMillis() < System.currentTimeMillis()) {
+                throw new IOException(TOKEN_EXPIRED_MSG);
+            }
+        }
+    }
+
+    private RESTToken getToken(Path path) {
+        String dataPath = options.get(DATA_PATH_CONF_KEY);
+        String basePath = dataPath.replaceAll(RESTFileIOTestLoader.SCHEME + 
"://", "");
+        String filePath = path.toString().split(":")[1].replaceAll(basePath, 
"");
+        String[] paths = filePath.split("/");
+        String database = paths[0].replaceAll("\\.db", "");
+        String table = paths[1];
+        return DataTokenStore.getDataToken(
+                options.get(CatalogOptions.WAREHOUSE.key()),
+                Identifier.create(database, table).getFullName());
+    }
+}
diff --git 
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
 
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
index e15ffca931..862d2d125a 100644
--- 
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
+++ 
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -16,3 +16,4 @@
 org.apache.paimon.utils.FailingFileIO$Loader
 org.apache.paimon.utils.TraceableFileIO$Loader
 
org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataTest$TestFileIOLoader
+org.apache.paimon.rest.RESTFileIOTestLoader
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index cf66ee425b..5b4f5eae21 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -18,9 +18,18 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
 import org.apache.paimon.rest.RESTCatalogOptions;
 import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTFileIOTestLoader;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.RESTToken;
 import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.AfterEach;
@@ -32,8 +41,10 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** ITCase for REST catalog. */
 class RESTCatalogITCase extends CatalogITCaseBase {
@@ -43,6 +54,7 @@ class RESTCatalogITCase extends CatalogITCaseBase {
 
     private RESTCatalogServer restCatalogServer;
     private String serverUrl;
+    private String dataPath;
     private String warehouse;
     @TempDir java.nio.file.Path tempFile;
 
@@ -50,8 +62,19 @@ class RESTCatalogITCase extends CatalogITCaseBase {
     @Override
     public void before() throws IOException {
         String initToken = "init_token";
-        warehouse = tempFile.toUri().toString();
-        restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+        dataPath = tempFile.toUri().toString();
+        warehouse = UUID.randomUUID().toString();
+        ConfigResponse config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                RESTCatalogOptions.DATA_TOKEN_ENABLED.key(),
+                                "true",
+                                CatalogOptions.WAREHOUSE.key(),
+                                warehouse),
+                        ImmutableMap.of());
+        restCatalogServer = new RESTCatalogServer(dataPath, initToken, config, 
warehouse);
         restCatalogServer.start();
         serverUrl = restCatalogServer.getUrl();
         super.before();
@@ -94,19 +117,38 @@ class RESTCatalogITCase extends CatalogITCaseBase {
                                 DATABASE_NAME, TABLE_NAME));
     }
 
-    @Override
-    protected Map<String, String> catalogOptions() {
-        String initToken = "init_token";
-        Map<String, String> options = new HashMap<>();
-        options.put("metastore", "rest");
-        options.put(RESTCatalogOptions.URI.key(), serverUrl);
-        options.put(RESTCatalogOptions.TOKEN.key(), initToken);
-        options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.BEAR.identifier());
-        return options;
+    @Test
+    public void testWriteAndRead() {
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
+                        DATABASE_NAME, TABLE_NAME));
+        assertThat(batchSql(String.format("SELECT * FROM %s.%s", 
DATABASE_NAME, TABLE_NAME)))
+                .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 
22.0D));
     }
 
     @Test
-    public void testWriteAndRead() {
+    public void testExpiredDataToken() {
+        Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
+        RESTToken expiredDataToken =
+                new RESTToken(
+                        ImmutableMap.of(
+                                "akId", "akId-expire", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() - 100_000);
+        restCatalogServer.setDataToken(identifier, expiredDataToken);
+        assertThrows(
+                RuntimeException.class,
+                () ->
+                        batchSql(
+                                String.format(
+                                        "INSERT INTO %s.%s VALUES ('1', 11), 
('2', 22)",
+                                        DATABASE_NAME, TABLE_NAME)));
+        // update token and retry
+        RESTToken dataToken =
+                new RESTToken(
+                        ImmutableMap.of("akId", "akId", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() + 100_000);
+        restCatalogServer.setDataToken(identifier, dataToken);
         batchSql(
                 String.format(
                         "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
@@ -115,9 +157,25 @@ class RESTCatalogITCase extends CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 
22.0D));
     }
 
+    @Override
+    protected Map<String, String> catalogOptions() {
+        String initToken = "init_token";
+        Map<String, String> options = new HashMap<>();
+        options.put("metastore", "rest");
+        options.put(CatalogOptions.WAREHOUSE.key(), warehouse);
+        options.put(RESTCatalogOptions.URI.key(), serverUrl);
+        options.put(RESTCatalogOptions.TOKEN.key(), initToken);
+        options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.BEAR.identifier());
+        options.put(RESTCatalogOptions.DATA_TOKEN_ENABLED.key(), "true");
+        options.put(
+                RESTTestFileIO.DATA_PATH_CONF_KEY,
+                dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
+        return options;
+    }
+
     @Override
     protected String getTempDirPath() {
-        return this.warehouse;
+        return this.dataPath;
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index cd2acdbe86..d15e14c7d5 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -18,8 +18,13 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
 import org.apache.paimon.rest.RESTCatalogServer;
 import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterEach;
@@ -28,6 +33,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -36,14 +42,24 @@ public class SparkCatalogWithRestTest {
 
     private RESTCatalogServer restCatalogServer;
     private String serverUrl;
+    private String dataPath;
     private String warehouse;
-    private String initToken = "init_token";
     @TempDir java.nio.file.Path tempFile;
+    private String initToken = "init_token";
 
     @BeforeEach
     public void before() throws IOException {
-        warehouse = tempFile.toUri().toString();
-        restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+        dataPath = tempFile.toUri().toString();
+        warehouse = UUID.randomUUID().toString();
+        ConfigResponse config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                CatalogOptions.WAREHOUSE.key(),
+                                warehouse),
+                        ImmutableMap.of());
+        restCatalogServer = new RESTCatalogServer(dataPath, initToken, config, 
warehouse);
         restCatalogServer.start();
         serverUrl = restCatalogServer.getUrl();
     }
@@ -61,6 +77,7 @@ public class SparkCatalogWithRestTest {
                         .config("spark.sql.catalog.paimon.metastore", "rest")
                         .config("spark.sql.catalog.paimon.uri", serverUrl)
                         .config("spark.sql.catalog.paimon.token", initToken)
+                        .config("spark.sql.catalog.paimon.warehouse", 
warehouse)
                         .config(
                                 "spark.sql.catalog.paimon.token.provider",
                                 AuthProviderEnum.BEAR.identifier())

Reply via email to