This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fa528969a2 [flink] Add Tests and ITCases in flink for RESTCatalog 
(#4805)
fa528969a2 is described below

commit fa528969a2df03a41ef2296f910546b908940f9c
Author: jerry <[email protected]>
AuthorDate: Thu Jan 9 20:06:30 2025 +0800

    [flink] Add Tests and ITCases in flink for RESTCatalog (#4805)
---
 paimon-core/pom.xml                                |   1 -
 .../org/apache/paimon/catalog/AbstractCatalog.java |  12 +-
 .../org/apache/paimon/catalog/CatalogUtils.java    |  13 +
 .../apache/paimon/rest/DefaultErrorHandler.java    |   9 +-
 .../java/org/apache/paimon/rest/HttpClient.java    |  23 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  57 ++-
 .../java/org/apache/paimon/rest/ResourcePaths.java |  54 +--
 .../org/apache/paimon/rest/auth/AuthSession.java   |   2 +-
 .../rest/exceptions/AlreadyExistsException.java    |  21 +-
 .../paimon/rest/exceptions/ForbiddenException.java |   1 +
 .../rest/exceptions/NoSuchResourceException.java   |  21 +-
 .../rest/exceptions/NotAuthorizedException.java    |   1 +
 .../paimon/rest/exceptions/RESTException.java      |   1 +
 .../rest/exceptions/ServiceFailureException.java   |   1 +
 .../exceptions/ServiceUnavailableException.java    |   1 +
 ...ion.java => UnsupportedOperationException.java} |   7 +-
 .../paimon/rest/responses/ErrorResponse.java       |  36 +-
 .../ErrorResponseResourceType.java}                |  12 +-
 .../org/apache/paimon/catalog/CatalogTestBase.java | 165 +++++----
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    |   6 +-
 .../paimon/rest/DefaultErrorHandlerTest.java       |   4 +-
 .../org/apache/paimon/rest/HttpClientTest.java     |  51 ++-
 .../org/apache/paimon/rest/MockRESTMessage.java    |  14 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 329 ++++++++++++++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 381 ++++-----------------
 .../apache/paimon/rest/RESTObjectMapperTest.java   |  86 +++--
 .../apache/paimon/rest/auth/AuthSessionTest.java   |  17 +
 paimon-flink/paimon-flink-common/pom.xml           |   7 +
 .../org/apache/paimon/flink/CatalogITCaseBase.java |   8 +-
 .../org/apache/paimon/flink/RESTCatalogITCase.java | 116 +++++++
 .../org/apache/paimon/hive/HiveCatalogTest.java    |  10 +-
 pom.xml                                            |   1 +
 32 files changed, 923 insertions(+), 545 deletions(-)

diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 6cdb9a9c93..c4d8f0283b 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -33,7 +33,6 @@ under the License.
 
     <properties>
         <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
-        <okhttp.version>4.12.0</okhttp.version>
     </properties>
 
     <dependencies>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 2ecbcf61b3..a4c47f54a6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -63,6 +63,7 @@ import static 
org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
 import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
+import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
@@ -504,17 +505,6 @@ public abstract class AbstractCatalog implements Catalog {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
-    private void validateAutoCreateClose(Map<String, String> options) {
-        checkArgument(
-                !Boolean.parseBoolean(
-                        options.getOrDefault(
-                                CoreOptions.AUTO_CREATE.key(),
-                                
CoreOptions.AUTO_CREATE.defaultValue().toString())),
-                String.format(
-                        "The value of %s property should be %s.",
-                        CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
-    }
-
     private void validateCustomTablePath(Map<String, String> options) {
         if (!allowCustomTablePath() && 
options.containsKey(CoreOptions.PATH.key())) {
             throw new UnsupportedOperationException(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index fabfa50fc4..9267532f9d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
@@ -38,6 +39,7 @@ import static 
org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Utils for {@link Catalog}. */
 public class CatalogUtils {
@@ -108,6 +110,17 @@ public class CatalogUtils {
         }
     }
 
+    public static void validateAutoCreateClose(Map<String, String> options) {
+        checkArgument(
+                !Boolean.parseBoolean(
+                        options.getOrDefault(
+                                CoreOptions.AUTO_CREATE.key(),
+                                
CoreOptions.AUTO_CREATE.defaultValue().toString())),
+                String.format(
+                        "The value of %s property should be %s.",
+                        CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
+    }
+
     public static Table createSystemTable(Identifier identifier, Table 
originTable)
             throws Catalog.TableNotExistException {
         if (!(originTable instanceof FileStoreTable)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/DefaultErrorHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/DefaultErrorHandler.java
index ce2cbb56ae..944b986b3f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/DefaultErrorHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/DefaultErrorHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.paimon.rest.exceptions.NotAuthorizedException;
 import org.apache.paimon.rest.exceptions.RESTException;
 import org.apache.paimon.rest.exceptions.ServiceFailureException;
 import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
+import org.apache.paimon.rest.exceptions.UnsupportedOperationException;
 import org.apache.paimon.rest.responses.ErrorResponse;
 
 /** Default error handler. */
@@ -43,18 +44,20 @@ public class DefaultErrorHandler extends ErrorHandler {
         String message = error.getMessage();
         switch (code) {
             case 400:
-                throw new BadRequestException(String.format("Malformed 
request: %s", message));
+                throw new BadRequestException(String.format("%s", message));
             case 401:
                 throw new NotAuthorizedException("Not authorized: %s", 
message);
             case 403:
                 throw new ForbiddenException("Forbidden: %s", message);
             case 404:
-                throw new NoSuchResourceException("%s", message);
+                throw new NoSuchResourceException(
+                        error.getResourceType(), error.getResourceName(), 
"%s", message);
             case 405:
             case 406:
                 break;
             case 409:
-                throw new AlreadyExistsException("%s", message);
+                throw new AlreadyExistsException(
+                        error.getResourceType(), error.getResourceName(), 
"%s", message);
             case 500:
                 throw new ServiceFailureException("Server error: %s", message);
             case 501:
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 08284fc454..2862e5ef02 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -63,7 +63,11 @@ public class HttpClient implements RESTClient {
     }
 
     public HttpClient(HttpClientOptions httpClientOptions) {
-        this.uri = httpClientOptions.uri();
+        if (httpClientOptions.uri() != null && 
httpClientOptions.uri().endsWith("/")) {
+            this.uri = httpClientOptions.uri().substring(0, 
httpClientOptions.uri().length() - 1);
+        } else {
+            this.uri = httpClientOptions.uri();
+        }
         this.okHttpClient = createHttpClient(httpClientOptions);
         this.errorHandler = DefaultErrorHandler.getInstance();
     }
@@ -132,10 +136,19 @@ public class HttpClient implements RESTClient {
         try (Response response = okHttpClient.newCall(request).execute()) {
             String responseBodyStr = response.body() != null ? 
response.body().string() : null;
             if (!response.isSuccessful()) {
-                ErrorResponse error =
-                        new ErrorResponse(
-                                responseBodyStr != null ? responseBodyStr : 
"response body is null",
-                                response.code());
+                ErrorResponse error;
+                try {
+                    error = OBJECT_MAPPER.readValue(responseBodyStr, 
ErrorResponse.class);
+                } catch (JsonProcessingException e) {
+                    error =
+                            new ErrorResponse(
+                                    null,
+                                    null,
+                                    responseBodyStr != null
+                                            ? responseBodyStr
+                                            : "response body is null",
+                                    response.code());
+                }
                 errorHandler.accept(error);
             }
             if (responseType != null && responseBodyStr != null) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 2c36f75a37..3f7647ca84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -35,8 +35,10 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.auth.AuthSession;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
+import org.apache.paimon.rest.exceptions.BadRequestException;
 import org.apache.paimon.rest.exceptions.ForbiddenException;
 import org.apache.paimon.rest.exceptions.NoSuchResourceException;
+import org.apache.paimon.rest.exceptions.ServiceFailureException;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
@@ -45,6 +47,7 @@ import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
+import org.apache.paimon.rest.responses.ErrorResponseResourceType;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -75,9 +78,12 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
 import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
+import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
 import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
@@ -209,7 +215,7 @@ public class RESTCatalog implements Catalog {
                 throw new DatabaseNotEmptyException(name);
             }
             client.delete(resourcePaths.database(name), headers());
-        } catch (NoSuchResourceException e) {
+        } catch (NoSuchResourceException | DatabaseNotExistException e) {
             if (!ignoreIfNotExists) {
                 throw new DatabaseNotExistException(name);
             }
@@ -249,12 +255,19 @@ public class RESTCatalog implements Catalog {
 
     @Override
     public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
-        ListTablesResponse response =
-                client.get(resourcePaths.tables(databaseName), 
ListTablesResponse.class, headers());
-        if (response.getTables() != null) {
-            return response.getTables();
+        try {
+            ListTablesResponse response =
+                    client.get(
+                            resourcePaths.tables(databaseName),
+                            ListTablesResponse.class,
+                            headers());
+            if (response.getTables() != null) {
+                return response.getTables();
+            }
+            return ImmutableList.of();
+        } catch (NoSuchResourceException e) {
+            throw new DatabaseNotExistException(databaseName);
         }
-        return ImmutableList.of();
     }
 
     @Override
@@ -272,6 +285,9 @@ public class RESTCatalog implements Catalog {
     public void createTable(Identifier identifier, Schema schema, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException {
         try {
+            checkNotBranch(identifier, "createTable");
+            checkNotSystemTable(identifier, "createTable");
+            validateAutoCreateClose(schema.options());
             CreateTableRequest request = new CreateTableRequest(identifier, 
schema);
             client.post(
                     resourcePaths.tables(identifier.getDatabaseName()),
@@ -282,12 +298,24 @@ public class RESTCatalog implements Catalog {
             if (!ignoreIfExists) {
                 throw new TableAlreadyExistException(identifier);
             }
+        } catch (NoSuchResourceException e) {
+            throw new DatabaseNotExistException(identifier.getDatabaseName());
+        } catch (BadRequestException e) {
+            throw new RuntimeException(new 
IllegalArgumentException(e.getMessage()));
+        } catch (IllegalArgumentException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 
     @Override
     public void renameTable(Identifier fromTable, Identifier toTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, TableAlreadyExistException {
+        checkNotBranch(fromTable, "renameTable");
+        checkNotBranch(toTable, "renameTable");
+        checkNotSystemTable(fromTable, "renameTable");
+        checkNotSystemTable(toTable, "renameTable");
         try {
             RenameTableRequest request = new RenameTableRequest(toTable);
             client.post(
@@ -311,6 +339,7 @@ public class RESTCatalog implements Catalog {
     public void alterTable(
             Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        checkNotSystemTable(identifier, "alterTable");
         try {
             AlterTableRequest request = new AlterTableRequest(changes);
             client.post(
@@ -320,16 +349,30 @@ public class RESTCatalog implements Catalog {
                     headers());
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
-                throw new TableNotExistException(identifier);
+                if (e.resourceType() == ErrorResponseResourceType.TABLE) {
+                    throw new TableNotExistException(identifier);
+                } else if (e.resourceType() == 
ErrorResponseResourceType.COLUMN) {
+                    throw new ColumnNotExistException(identifier, 
e.resourceName());
+                }
             }
+        } catch (AlreadyExistsException e) {
+            throw new ColumnAlreadyExistException(identifier, 
e.resourceName());
         } catch (ForbiddenException e) {
             throw new TableNoPermissionException(identifier, e);
+        } catch 
(org.apache.paimon.rest.exceptions.UnsupportedOperationException e) {
+            throw new UnsupportedOperationException(e.getMessage());
+        } catch (ServiceFailureException e) {
+            throw new IllegalStateException(e.getMessage());
+        } catch (BadRequestException e) {
+            throw new RuntimeException(new 
IllegalArgumentException(e.getMessage()));
         }
     }
 
     @Override
     public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
+        checkNotBranch(identifier, "dropTable");
+        checkNotSystemTable(identifier, "dropTable");
         try {
             client.delete(
                     resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 780582c33c..f7d2f71169 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -20,13 +20,17 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.options.Options;
 
-import java.util.StringJoiner;
+import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
 
 /** Resource paths for REST catalog. */
 public class ResourcePaths {
 
-    public static final String V1_CONFIG = "/v1/config";
-    private static final StringJoiner SLASH = new StringJoiner("/");
+    private static final Joiner SLASH = Joiner.on("/").skipNulls();
+    private static final String V1 = "/v1";
+    private static final String DATABASES = "databases";
+    private static final String TABLES = "tables";
+
+    public static final String V1_CONFIG = V1 + "/config";
 
     public static ResourcePaths forCatalogProperties(Options options) {
         return new 
ResourcePaths(options.get(RESTCatalogInternalOptions.PREFIX));
@@ -39,60 +43,30 @@ public class ResourcePaths {
     }
 
     public String databases() {
-        return SLASH.add("v1").add(prefix).add("databases").toString();
+        return SLASH.join(V1, prefix, DATABASES);
     }
 
     public String database(String databaseName) {
-        return 
SLASH.add("v1").add(prefix).add("databases").add(databaseName).toString();
+        return SLASH.join(V1, prefix, DATABASES, databaseName);
     }
 
     public String databaseProperties(String databaseName) {
-        return SLASH.add("v1")
-                .add(prefix)
-                .add("databases")
-                .add(databaseName)
-                .add("properties")
-                .toString();
+        return SLASH.join(V1, prefix, DATABASES, databaseName, "properties");
     }
 
     public String tables(String databaseName) {
-        return SLASH.add("v1")
-                .add(prefix)
-                .add("databases")
-                .add(databaseName)
-                .add("tables")
-                .toString();
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES);
     }
 
     public String table(String databaseName, String tableName) {
-        return SLASH.add("v1")
-                .add(prefix)
-                .add("databases")
-                .add(databaseName)
-                .add("tables")
-                .add(tableName)
-                .toString();
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName);
     }
 
     public String renameTable(String databaseName, String tableName) {
-        return SLASH.add("v1")
-                .add(prefix)
-                .add("databases")
-                .add(databaseName)
-                .add("tables")
-                .add(tableName)
-                .add("rename")
-                .toString();
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, "rename");
     }
 
     public String partitions(String databaseName, String tableName) {
-        return SLASH.add("v1")
-                .add(prefix)
-                .add("databases")
-                .add(databaseName)
-                .add("tables")
-                .add(tableName)
-                .add("partitions")
-                .toString();
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, "partitions");
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
index 198b098687..470af7f6f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java
@@ -46,8 +46,8 @@ public class AuthSession {
     private volatile Map<String, String> headers;
 
     public AuthSession(Map<String, String> headers, CredentialsProvider 
credentialsProvider) {
-        this.headers = headers;
         this.credentialsProvider = credentialsProvider;
+        this.headers = RESTUtil.merge(headers, 
this.credentialsProvider.authHeader());
     }
 
     public static AuthSession fromRefreshCredentialsProvider(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/AlreadyExistsException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/AlreadyExistsException.java
index 8e30c8375b..6da7a492b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/AlreadyExistsException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/AlreadyExistsException.java
@@ -18,10 +18,29 @@
 
 package org.apache.paimon.rest.exceptions;
 
+import org.apache.paimon.rest.responses.ErrorResponseResourceType;
+
 /** Exception thrown on HTTP 409 means a resource already exists. */
 public class AlreadyExistsException extends RESTException {
 
-    public AlreadyExistsException(String message, Object... args) {
+    private final ErrorResponseResourceType resourceType;
+    private final String resourceName;
+
+    public AlreadyExistsException(
+            ErrorResponseResourceType resourceType,
+            String resourceName,
+            String message,
+            Object... args) {
         super(message, args);
+        this.resourceType = resourceType;
+        this.resourceName = resourceName;
+    }
+
+    public ErrorResponseResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String resourceName() {
+        return resourceName;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
index 3982e5b704..76cb53bfc3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.exceptions;
 
 /** Exception thrown on HTTP 403 Forbidden. */
 public class ForbiddenException extends RESTException {
+
     public ForbiddenException(String message, Object... args) {
         super(message, args);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NoSuchResourceException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NoSuchResourceException.java
index cc4c7881f4..6dfb125671 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NoSuchResourceException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NoSuchResourceException.java
@@ -18,10 +18,29 @@
 
 package org.apache.paimon.rest.exceptions;
 
+import org.apache.paimon.rest.responses.ErrorResponseResourceType;
+
 /** Exception thrown on HTTP 404 means a resource not exists. */
 public class NoSuchResourceException extends RESTException {
 
-    public NoSuchResourceException(String message, Object... args) {
+    private final ErrorResponseResourceType resourceType;
+    private final String resourceName;
+
+    public NoSuchResourceException(
+            ErrorResponseResourceType resourceType,
+            String resourceName,
+            String message,
+            Object... args) {
         super(message, args);
+        this.resourceType = resourceType;
+        this.resourceName = resourceName;
+    }
+
+    public ErrorResponseResourceType resourceType() {
+        return resourceType;
+    }
+
+    public String resourceName() {
+        return resourceName;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NotAuthorizedException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NotAuthorizedException.java
index 43c13b1a1c..79c9aa4e67 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NotAuthorizedException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/NotAuthorizedException.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.exceptions;
 
 /** Exception thrown on HTTP 401 Unauthorized. */
 public class NotAuthorizedException extends RESTException {
+
     public NotAuthorizedException(String message, Object... args) {
         super(String.format(message, args));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/RESTException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/RESTException.java
index 532936f430..f7648c5d1e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/RESTException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/RESTException.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.exceptions;
 
 /** Base class for REST client exceptions. */
 public class RESTException extends RuntimeException {
+
     public RESTException(String message, Object... args) {
         super(String.format(message, args));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceFailureException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceFailureException.java
index 45c48ec0de..1df196d90f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceFailureException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceFailureException.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.exceptions;
 
 /** Exception thrown on HTTP 500 - Bad Request. */
 public class ServiceFailureException extends RESTException {
+
     public ServiceFailureException(String message, Object... args) {
         super(String.format(message, args));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
index fb6a05e89f..c466b4c901 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.exceptions;
 
 /** Exception thrown on HTTP 503 - service is unavailable. */
 public class ServiceUnavailableException extends RESTException {
+
     public ServiceUnavailableException(String message, Object... args) {
         super(String.format(message, args));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/UnsupportedOperationException.java
similarity index 81%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/exceptions/UnsupportedOperationException.java
index fb6a05e89f..2feae109d3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ServiceUnavailableException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/UnsupportedOperationException.java
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.rest.exceptions;
 
-/** Exception thrown on HTTP 503 - service is unavailable. */
-public class ServiceUnavailableException extends RESTException {
-    public ServiceUnavailableException(String message, Object... args) {
+/** Exception thrown on HTTP 501 - UnsupportedOperationException. */
+public class UnsupportedOperationException extends RESTException {
+
+    public UnsupportedOperationException(String message, Object... args) {
         super(String.format(message, args));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
index eb95ff448a..8e88a37b11 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponse.java
@@ -36,9 +36,17 @@ import java.util.List;
 public class ErrorResponse implements RESTResponse {
 
     private static final String FIELD_MESSAGE = "message";
+    private static final String FIELD_RESOURCE_TYPE = "resourceType";
+    private static final String FIELD_RESOURCE_NAME = "resourceName";
     private static final String FIELD_CODE = "code";
     private static final String FIELD_STACK = "stack";
 
+    @JsonProperty(FIELD_RESOURCE_TYPE)
+    private final ErrorResponseResourceType resourceType;
+
+    @JsonProperty(FIELD_RESOURCE_NAME)
+    private final String resourceName;
+
     @JsonProperty(FIELD_MESSAGE)
     private final String message;
 
@@ -48,7 +56,13 @@ public class ErrorResponse implements RESTResponse {
     @JsonProperty(FIELD_STACK)
     private final List<String> stack;
 
-    public ErrorResponse(String message, Integer code) {
+    public ErrorResponse(
+            ErrorResponseResourceType resourceType,
+            String resourceName,
+            String message,
+            Integer code) {
+        this.resourceType = resourceType;
+        this.resourceName = resourceName;
         this.code = code;
         this.message = message;
         this.stack = new ArrayList<String>();
@@ -56,25 +70,33 @@ public class ErrorResponse implements RESTResponse {
 
     @JsonCreator
     public ErrorResponse(
+            @JsonProperty(FIELD_RESOURCE_TYPE) ErrorResponseResourceType 
resourceType,
+            @JsonProperty(FIELD_RESOURCE_NAME) String resourceName,
             @JsonProperty(FIELD_MESSAGE) String message,
             @JsonProperty(FIELD_CODE) int code,
             @JsonProperty(FIELD_STACK) List<String> stack) {
+        this.resourceType = resourceType;
+        this.resourceName = resourceName;
         this.message = message;
         this.code = code;
         this.stack = stack;
     }
 
-    public ErrorResponse(String message, int code, Throwable throwable) {
-        this.message = message;
-        this.code = code;
-        this.stack = getStackFromThrowable(throwable);
-    }
-
     @JsonGetter(FIELD_MESSAGE)
     public String getMessage() {
         return message;
     }
 
+    @JsonGetter(FIELD_RESOURCE_TYPE)
+    public ErrorResponseResourceType getResourceType() {
+        return resourceType;
+    }
+
+    @JsonGetter(FIELD_RESOURCE_NAME)
+    public String getResourceName() {
+        return resourceName;
+    }
+
     @JsonGetter(FIELD_CODE)
     public Integer getCode() {
         return code;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
similarity index 76%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
index 3982e5b704..590f38e720 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/exceptions/ForbiddenException.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest.exceptions;
+package org.apache.paimon.rest.responses;
 
-/** Exception thrown on HTTP 403 Forbidden. */
-public class ForbiddenException extends RESTException {
-    public ForbiddenException(String message, Object... args) {
-        super(message, args);
-    }
+/** The type of resource that caused the error. */
+public enum ErrorResponseResourceType {
+    DATABASE,
+    TABLE,
+    COLUMN,
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 31c4c8e682..f7aa4ab5a6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
@@ -37,7 +38,6 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
 
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -49,12 +49,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
+import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
+import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 /** Base test class of paimon catalog in {@link Catalog}. */
 public abstract class CatalogTestBase {
@@ -123,7 +127,7 @@ public abstract class CatalogTestBase {
 
         List<String> databases = catalog.listDatabases();
         List<String> distinctDatabases = 
databases.stream().distinct().collect(Collectors.toList());
-        Assertions.assertEquals(distinctDatabases.size(), databases.size());
+        assertEquals(distinctDatabases.size(), databases.size());
     }
 
     @Test
@@ -146,6 +150,56 @@ public abstract class CatalogTestBase {
                 .doesNotThrowAnyException();
     }
 
+    @Test
+    public void testAlterDatabase() throws Exception {
+        if (!supportsAlterDatabase()) {
+            return;
+        }
+        // Alter database
+        String databaseName = "db_to_alter";
+        catalog.createDatabase(databaseName, false);
+        String key = "key1";
+        String key2 = "key2";
+        // Add property
+        catalog.alterDatabase(
+                databaseName,
+                Lists.newArrayList(
+                        PropertyChange.setProperty(key, "value"),
+                        PropertyChange.setProperty(key2, "value")),
+                false);
+        Database db = catalog.getDatabase(databaseName);
+        assertEquals("value", db.options().get(key));
+        assertEquals("value", db.options().get(key2));
+        // Update property
+        catalog.alterDatabase(
+                databaseName,
+                Lists.newArrayList(
+                        PropertyChange.setProperty(key, "value1"),
+                        PropertyChange.setProperty(key2, "value1")),
+                false);
+        db = catalog.getDatabase(databaseName);
+        assertEquals("value1", db.options().get(key));
+        assertEquals("value1", db.options().get(key2));
+        // remove property
+        catalog.alterDatabase(
+                databaseName,
+                Lists.newArrayList(
+                        PropertyChange.removeProperty(key), 
PropertyChange.removeProperty(key2)),
+                false);
+        db = catalog.getDatabase(databaseName);
+        assertFalse(db.options().containsKey(key));
+        assertFalse(db.options().containsKey(key2));
+        // Remove non-existent property
+        catalog.alterDatabase(
+                databaseName,
+                Lists.newArrayList(
+                        PropertyChange.removeProperty(key), 
PropertyChange.removeProperty(key2)),
+                false);
+        db = catalog.getDatabase(databaseName);
+        assertFalse(db.options().containsKey(key));
+        assertFalse(db.options().containsKey(key2));
+    }
+
     @Test
     public void testDropDatabase() throws Exception {
         // Drop database deletes the database when it exists and there are no 
tables
@@ -193,6 +247,10 @@ public abstract class CatalogTestBase {
 
         tables = catalog.listTables("test_db");
         assertThat(tables).containsExactlyInAnyOrder("table1", "table2", 
"table3");
+
+        // List tables throws DatabaseNotExistException when the database does 
not exist
+        assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
+                .isThrownBy(() -> catalog.listTables("non_existing_db"));
     }
 
     @Test
@@ -225,8 +283,17 @@ public abstract class CatalogTestBase {
                 .withMessage("The value of auto-create property should be 
false.");
         schema.options().remove(CoreOptions.AUTO_CREATE.key());
 
+        // Create table and check the schema
+        schema.options().put("k1", "v1");
         catalog.createTable(identifier, schema, false);
-        catalog.getTable(identifier);
+        FileStoreTable dataTable = (FileStoreTable) 
catalog.getTable(identifier);
+        
assertThat(dataTable.schema().toSchema().fields()).isEqualTo(schema.fields());
+        
assertThat(dataTable.schema().toSchema().partitionKeys()).isEqualTo(schema.partitionKeys());
+        
assertThat(dataTable.schema().toSchema().comment()).isEqualTo(schema.comment());
+        
assertThat(dataTable.schema().toSchema().primaryKeys()).isEqualTo(schema.primaryKeys());
+        for (Map.Entry<String, String> option : schema.options().entrySet()) {
+            
assertThat(dataTable.options().get(option.getKey())).isEqualTo(option.getValue());
+        }
 
         // Create table throws Exception when table is system table
         assertThatExceptionOfType(IllegalArgumentException.class)
@@ -358,6 +425,20 @@ public abstract class CatalogTestBase {
                 .isThrownBy(
                         () -> 
catalog.getTable(Identifier.create("non_existing_db", "test_table")))
                 .withMessage("Table non_existing_db.test_table does not 
exist.");
+
+        // Get all table options from system database
+        if (!supportGetFromSystemDatabase()) {
+            return;
+        }
+        Table allTableOptionsTable =
+                catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME, 
ALL_TABLE_OPTIONS));
+        assertThat(allTableOptionsTable).isNotNull();
+        Table catalogOptionsTable =
+                catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME, 
CATALOG_OPTIONS));
+        assertThat(catalogOptionsTable).isNotNull();
+        assertThatExceptionOfType(Catalog.TableNotExistException.class)
+                .isThrownBy(
+                        () -> 
catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME, "1111")));
     }
 
     @Test
@@ -541,10 +622,7 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.renameColumn("col2", "new_col1")),
                                         false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnAlreadyExistException.class,
-                                "Column new_col1 already exists in the 
test_db.test_table table."));
+                .isInstanceOf(Catalog.ColumnAlreadyExistException.class);
 
         // Alter table renames a column throws ColumnNotExistException when 
column does not exist
         assertThatThrownBy(
@@ -555,10 +633,7 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.renameColumn(
                                                         "non_existing_col", 
"new_col2")),
                                         false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnNotExistException.class,
-                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+                .isInstanceOf(Catalog.ColumnNotExistException.class);
     }
 
     @Test
@@ -839,10 +914,6 @@ public abstract class CatalogTestBase {
         assertThat(table.comment().isPresent()).isFalse();
     }
 
-    protected boolean supportsView() {
-        return false;
-    }
-
     @Test
     public void testView() throws Exception {
         if (!supportsView()) {
@@ -904,10 +975,6 @@ public abstract class CatalogTestBase {
                 .isInstanceOf(Catalog.ViewNotExistException.class);
     }
 
-    protected boolean supportsFormatTable() {
-        return false;
-    }
-
     @Test
     public void testFormatTable() throws Exception {
         if (!supportsFormatTable()) {
@@ -962,49 +1029,19 @@ public abstract class CatalogTestBase {
                 .isGreaterThan(0);
     }
 
-    protected void alterDatabaseWhenSupportAlter() throws Exception {
-        // Alter database
-        String databaseName = "db_to_alter";
-        catalog.createDatabase(databaseName, false);
-        String key = "key1";
-        String key2 = "key2";
-        // Add property
-        catalog.alterDatabase(
-                databaseName,
-                Lists.newArrayList(
-                        PropertyChange.setProperty(key, "value"),
-                        PropertyChange.setProperty(key2, "value")),
-                false);
-        Database db = catalog.getDatabase(databaseName);
-        assertEquals("value", db.options().get(key));
-        assertEquals("value", db.options().get(key2));
-        // Update property
-        catalog.alterDatabase(
-                databaseName,
-                Lists.newArrayList(
-                        PropertyChange.setProperty(key, "value1"),
-                        PropertyChange.setProperty(key2, "value1")),
-                false);
-        db = catalog.getDatabase(databaseName);
-        assertEquals("value1", db.options().get(key));
-        assertEquals("value1", db.options().get(key2));
-        // remove property
-        catalog.alterDatabase(
-                databaseName,
-                Lists.newArrayList(
-                        PropertyChange.removeProperty(key), 
PropertyChange.removeProperty(key2)),
-                false);
-        db = catalog.getDatabase(databaseName);
-        assertEquals(false, db.options().containsKey(key));
-        assertEquals(false, db.options().containsKey(key2));
-        // Remove non-existent property
-        catalog.alterDatabase(
-                databaseName,
-                Lists.newArrayList(
-                        PropertyChange.removeProperty(key), 
PropertyChange.removeProperty(key2)),
-                false);
-        db = catalog.getDatabase(databaseName);
-        assertEquals(false, db.options().containsKey(key));
-        assertEquals(false, db.options().containsKey(key2));
+    protected boolean supportGetFromSystemDatabase() {
+        return true;
+    }
+
+    protected boolean supportsAlterDatabase() {
+        return false;
+    }
+
+    protected boolean supportsFormatTable() {
+        return false;
+    }
+
+    protected boolean supportsView() {
+        return false;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 51e2bf5c77..0dea920903 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -117,8 +117,8 @@ public class JdbcCatalogTest extends CatalogTestBase {
                 });
     }
 
-    @Test
-    public void testAlterDatabase() throws Exception {
-        this.alterDatabaseWhenSupportAlter();
+    @Override
+    protected boolean supportsAlterDatabase() {
+        return true;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/DefaultErrorHandlerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/DefaultErrorHandlerTest.java
index 340e38f6a7..0de7bf05d8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/DefaultErrorHandlerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/DefaultErrorHandlerTest.java
@@ -70,7 +70,7 @@ public class DefaultErrorHandlerTest {
                 ServiceFailureException.class,
                 () -> defaultErrorHandler.accept(generateErrorResponse(500)));
         assertThrows(
-                UnsupportedOperationException.class,
+                
org.apache.paimon.rest.exceptions.UnsupportedOperationException.class,
                 () -> defaultErrorHandler.accept(generateErrorResponse(501)));
         assertThrows(
                 RESTException.class, () -> 
defaultErrorHandler.accept(generateErrorResponse(502)));
@@ -80,6 +80,6 @@ public class DefaultErrorHandlerTest {
     }
 
     private ErrorResponse generateErrorResponse(int code) {
-        return new ErrorResponse("message", code, new ArrayList<String>());
+        return new ErrorResponse(null, null, "message", code, new 
ArrayList<String>());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index 3baff1ccaa..54e7d3a68e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -20,6 +20,9 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.rest.auth.BearTokenCredentialsProvider;
 import org.apache.paimon.rest.auth.CredentialsProvider;
+import org.apache.paimon.rest.exceptions.BadRequestException;
+import org.apache.paimon.rest.responses.ErrorResponse;
+import org.apache.paimon.rest.responses.ErrorResponseResourceType;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -31,28 +34,26 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Test for {@link HttpClient}. */
 public class HttpClientTest {
 
     private static final String MOCK_PATH = "/v1/api/mock";
     private static final String TOKEN = "token";
-
-    private final ObjectMapper objectMapper = RESTObjectMapper.create();
+    private static final ObjectMapper OBJECT_MAPPER = 
RESTObjectMapper.create();
 
     private MockWebServer mockWebServer;
     private HttpClient httpClient;
     private ErrorHandler errorHandler;
     private MockRESTData mockResponseData;
     private String mockResponseDataStr;
+    private ErrorResponse errorResponse;
+    private String errorResponseStr;
     private Map<String, String> headers;
 
     @Before
@@ -60,11 +61,13 @@ public class HttpClientTest {
         mockWebServer = new MockWebServer();
         mockWebServer.start();
         String baseUrl = mockWebServer.url("").toString();
-        errorHandler = mock(ErrorHandler.class);
+        errorHandler = DefaultErrorHandler.getInstance();
         HttpClientOptions httpClientOptions =
                 new HttpClientOptions(baseUrl, Duration.ofSeconds(3), 
Duration.ofSeconds(3), 1);
         mockResponseData = new MockRESTData(MOCK_PATH);
-        mockResponseDataStr = 
objectMapper.writeValueAsString(mockResponseData);
+        mockResponseDataStr = 
OBJECT_MAPPER.writeValueAsString(mockResponseData);
+        errorResponse = new ErrorResponse(ErrorResponseResourceType.DATABASE, 
"test", "test", 400);
+        errorResponseStr = OBJECT_MAPPER.writeValueAsString(errorResponse);
         httpClient = new HttpClient(httpClientOptions);
         httpClient.setErrorHandler(errorHandler);
         CredentialsProvider credentialsProvider = new 
BearTokenCredentialsProvider(TOKEN);
@@ -80,15 +83,15 @@ public class HttpClientTest {
     public void testGetSuccess() {
         mockHttpCallWithCode(mockResponseDataStr, 200);
         MockRESTData response = httpClient.get(MOCK_PATH, MockRESTData.class, 
headers);
-        verify(errorHandler, times(0)).accept(any());
         assertEquals(mockResponseData.data(), response.data());
     }
 
     @Test
     public void testGetFail() {
-        mockHttpCallWithCode(mockResponseDataStr, 400);
-        httpClient.get(MOCK_PATH, MockRESTData.class, headers);
-        verify(errorHandler, times(1)).accept(any());
+        mockHttpCallWithCode(errorResponseStr, 400);
+        assertThrows(
+                BadRequestException.class,
+                () -> httpClient.get(MOCK_PATH, MockRESTData.class, headers));
     }
 
     @Test
@@ -96,35 +99,27 @@ public class HttpClientTest {
         mockHttpCallWithCode(mockResponseDataStr, 200);
         MockRESTData response =
                 httpClient.post(MOCK_PATH, mockResponseData, 
MockRESTData.class, headers);
-        verify(errorHandler, times(0)).accept(any());
         assertEquals(mockResponseData.data(), response.data());
     }
 
     @Test
     public void testPostFail() {
-        mockHttpCallWithCode(mockResponseDataStr, 400);
-        httpClient.post(MOCK_PATH, mockResponseData, MockRESTData.class, 
headers);
-        verify(errorHandler, times(1)).accept(any());
+        mockHttpCallWithCode(errorResponseStr, 400);
+        assertThrows(
+                BadRequestException.class,
+                () -> httpClient.post(MOCK_PATH, mockResponseData, 
ErrorResponse.class, headers));
     }
 
     @Test
     public void testDeleteSuccess() {
         mockHttpCallWithCode(mockResponseDataStr, 200);
-        MockRESTData response = httpClient.delete(MOCK_PATH, headers);
-        verify(errorHandler, times(0)).accept(any());
+        assertDoesNotThrow(() -> httpClient.delete(MOCK_PATH, headers));
     }
 
     @Test
     public void testDeleteFail() {
-        mockHttpCallWithCode(mockResponseDataStr, 400);
-        httpClient.delete(MOCK_PATH, headers);
-        verify(errorHandler, times(1)).accept(any());
-    }
-
-    private Map<String, String> headers(String token) {
-        Map<String, String> header = new HashMap<>();
-        header.put("Authorization", "Bearer " + token);
-        return header;
+        mockHttpCallWithCode(errorResponseStr, 400);
+        assertThrows(BadRequestException.class, () -> 
httpClient.delete(MOCK_PATH, headers));
     }
 
     private void mockHttpCallWithCode(String body, Integer code) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 4b228d93c6..58a73bbfcb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -30,7 +30,6 @@ import org.apache.paimon.rest.requests.DropPartitionRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
-import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -48,6 +47,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
+import okhttp3.mockwebserver.MockResponse;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -89,10 +90,6 @@ public class MockRESTMessage {
         return new ListDatabasesResponse(databaseNameList);
     }
 
-    public static ErrorResponse noSuchResourceExceptionErrorResponse() {
-        return new ErrorResponse("message", 404, new ArrayList<>());
-    }
-
     public static AlterDatabaseRequest alterDatabaseRequest() {
         Map<String, String> add = new HashMap<>();
         add.put("add", "value");
@@ -243,6 +240,13 @@ public class MockRESTMessage {
         return new GetTableResponse("/tmp/1", 1, schema(options));
     }
 
+    public static MockResponse mockResponse(String body, int httpCode) {
+        return new MockResponse()
+                .setResponseCode(httpCode)
+                .setBody(body)
+                .addHeader("Content-Type", "application/json");
+    }
+
     private static Schema schema(Map<String, String> options) {
         List<DataField> fields =
                 Arrays.asList(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
new file mode 100644
index 0000000000..4fe2029113
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.requests.AlterTableRequest;
+import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.responses.CreateDatabaseResponse;
+import org.apache.paimon.rest.responses.ErrorResponse;
+import org.apache.paimon.rest.responses.ErrorResponseResourceType;
+import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
+import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.table.FileStoreTable;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Mock REST server for testing. */
+public class RESTCatalogServer {
+
+    private static final ObjectMapper OBJECT_MAPPER = 
RESTObjectMapper.create();
+    private static final String PREFIX = "paimon";
+    private static final String DATABASE_URI = 
String.format("/v1/%s/databases", PREFIX);
+
+    private final Catalog catalog;
+    private final Dispatcher dispatcher;
+    private final MockWebServer server;
+    private final String authToken;
+
+    public RESTCatalogServer(String warehouse, String initToken) {
+        authToken = initToken;
+        Options conf = new Options();
+        conf.setString("warehouse", warehouse);
+        this.catalog =
+                CatalogFactory.createCatalog(
+                        CatalogContext.create(conf), 
this.getClass().getClassLoader());
+        this.dispatcher = initDispatcher(catalog, authToken);
+        MockWebServer mockWebServer = new MockWebServer();
+        mockWebServer.setDispatcher(dispatcher);
+        server = mockWebServer;
+    }
+
+    public void start() throws IOException {
+        server.start();
+    }
+
+    public String getUrl() {
+        return server.url("").toString();
+    }
+
+    public void shutdown() throws IOException {
+        server.shutdown();
+    }
+
+    public static Dispatcher initDispatcher(Catalog catalog, String authToken) 
{
+        return new Dispatcher() {
+            @Override
+            public MockResponse dispatch(RecordedRequest request) {
+                String token = request.getHeaders().get("Authorization");
+                RESTResponse response;
+                try {
+                    if (!("Bearer " + authToken).equals(token)) {
+                        return new MockResponse().setResponseCode(401);
+                    }
+                    if ("/v1/config".equals(request.getPath())) {
+                        return new MockResponse()
+                                .setResponseCode(200)
+                                .setBody(getConfigBody(catalog.warehouse()));
+                    } else if (DATABASE_URI.equals(request.getPath())) {
+                        return databasesApiHandler(catalog, request);
+                    } else if (request.getPath().startsWith(DATABASE_URI)) {
+                        String[] resources =
+                                request.getPath()
+                                        .substring((DATABASE_URI + 
"/").length())
+                                        .split("/");
+                        String databaseName = resources[0];
+                        boolean isTables = resources.length == 2 && 
"tables".equals(resources[1]);
+                        boolean isTable = resources.length == 3 && 
"tables".equals(resources[1]);
+                        boolean isTableRename =
+                                resources.length == 4 && 
"rename".equals(resources[3]);
+                        boolean isPartitions =
+                                resources.length == 4
+                                        && "tables".equals(resources[1])
+                                        && "partitions".equals(resources[3]);
+                        if (isPartitions) {
+                            String tableName = resources[2];
+                            List<Partition> partitions =
+                                    catalog.listPartitions(
+                                            Identifier.create(databaseName, 
tableName));
+                            response = new ListPartitionsResponse(partitions);
+                            return mockResponse(response, 200);
+                        } else if (isTableRename) {
+                            return renameTableApiHandler(
+                                    catalog, request, databaseName, 
resources[2]);
+                        } else if (isTable) {
+                            String tableName = resources[2];
+                            return tableApiHandler(catalog, request, 
databaseName, tableName);
+                        } else if (isTables) {
+                            return tablesApiHandler(catalog, request, 
databaseName);
+                        } else {
+                            return databaseApiHandler(catalog, request, 
databaseName);
+                        }
+                    }
+                    return new MockResponse().setResponseCode(404);
+                } catch (Catalog.DatabaseNotExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.DATABASE,
+                                    e.database(),
+                                    e.getMessage(),
+                                    404);
+                    return mockResponse(response, 404);
+                } catch (Catalog.TableNotExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.TABLE,
+                                    e.identifier().getTableName(),
+                                    e.getMessage(),
+                                    404);
+                    return mockResponse(response, 404);
+                } catch (Catalog.ColumnNotExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.COLUMN,
+                                    e.column(),
+                                    e.getMessage(),
+                                    404);
+                    return mockResponse(response, 404);
+                } catch (Catalog.DatabaseAlreadyExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.DATABASE,
+                                    e.database(),
+                                    e.getMessage(),
+                                    409);
+                    return mockResponse(response, 409);
+                } catch (Catalog.TableAlreadyExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.TABLE,
+                                    e.identifier().getTableName(),
+                                    e.getMessage(),
+                                    409);
+                    return mockResponse(response, 409);
+                } catch (Catalog.ColumnAlreadyExistException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.COLUMN,
+                                    e.column(),
+                                    e.getMessage(),
+                                    409);
+                    return mockResponse(response, 409);
+                } catch (IllegalArgumentException e) {
+                    response = new ErrorResponse(null, null, e.getMessage(), 
400);
+                    return mockResponse(response, 400);
+                } catch (Exception e) {
+                    if (e.getCause() instanceof IllegalArgumentException) {
+                        response =
+                                new ErrorResponse(
+                                        null, null, 
e.getCause().getCause().getMessage(), 400);
+                        return mockResponse(response, 400);
+                    } else if (e instanceof UnsupportedOperationException) {
+                        response = new ErrorResponse(null, null, 
e.getMessage(), 501);
+                        return mockResponse(response, 501);
+                    } else if (e instanceof IllegalStateException) {
+                        response = new ErrorResponse(null, null, 
e.getMessage(), 500);
+                        return mockResponse(response, 500);
+                    }
+                    return new MockResponse().setResponseCode(500);
+                }
+            }
+        };
+    }
+
+    private static MockResponse renameTableApiHandler(
+            Catalog catalog, RecordedRequest request, String databaseName, 
String tableName)
+            throws Exception {
+        RenameTableRequest requestBody =
+                OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
RenameTableRequest.class);
+        catalog.renameTable(
+                Identifier.create(databaseName, tableName), 
requestBody.getNewIdentifier(), false);
+        FileStoreTable table = (FileStoreTable) 
catalog.getTable(requestBody.getNewIdentifier());
+        RESTResponse response =
+                new GetTableResponse(
+                        AbstractCatalog.newTableLocation(
+                                        catalog.warehouse(), 
requestBody.getNewIdentifier())
+                                .toString(),
+                        table.schema().id(),
+                        table.schema().toSchema());
+        return mockResponse(response, 200);
+    }
+
+    private static MockResponse databasesApiHandler(Catalog catalog, 
RecordedRequest request)
+            throws Exception {
+        RESTResponse response;
+        if (request.getMethod().equals("GET")) {
+            List<String> databaseNameList = catalog.listDatabases();
+            response = new ListDatabasesResponse(databaseNameList);
+            return mockResponse(response, 200);
+        } else if (request.getMethod().equals("POST")) {
+            CreateDatabaseRequest requestBody =
+                    OBJECT_MAPPER.readValue(
+                            request.getBody().readUtf8(), 
CreateDatabaseRequest.class);
+            String databaseName = requestBody.getName();
+            catalog.createDatabase(databaseName, false);
+            response = new CreateDatabaseResponse(databaseName, 
requestBody.getOptions());
+            return mockResponse(response, 200);
+        }
+        return new MockResponse().setResponseCode(404);
+    }
+
+    private static MockResponse databaseApiHandler(
+            Catalog catalog, RecordedRequest request, String databaseName) 
throws Exception {
+        RESTResponse response;
+        if (request.getMethod().equals("GET")) {
+            Database database = catalog.getDatabase(databaseName);
+            response = new GetDatabaseResponse(database.name(), 
database.options());
+            return mockResponse(response, 200);
+        } else if (request.getMethod().equals("DELETE")) {
+            catalog.dropDatabase(databaseName, false, true);
+            return new MockResponse().setResponseCode(200);
+        }
+        return new MockResponse().setResponseCode(404);
+    }
+
+    private static MockResponse tablesApiHandler(
+            Catalog catalog, RecordedRequest request, String databaseName) 
throws Exception {
+        RESTResponse response;
+        if (request.getMethod().equals("POST")) {
+            CreateTableRequest requestBody =
+                    OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
CreateTableRequest.class);
+            catalog.createTable(requestBody.getIdentifier(), 
requestBody.getSchema(), false);
+            response = new GetTableResponse("", 1L, requestBody.getSchema());
+            return mockResponse(response, 200);
+        } else if (request.getMethod().equals("GET")) {
+            catalog.listTables(databaseName);
+            response = new 
ListTablesResponse(catalog.listTables(databaseName));
+            return mockResponse(response, 200);
+        }
+        return new MockResponse().setResponseCode(404);
+    }
+
+    private static MockResponse tableApiHandler(
+            Catalog catalog, RecordedRequest request, String databaseName, 
String tableName)
+            throws Exception {
+        RESTResponse response;
+        if (request.getMethod().equals("GET")) {
+            Identifier identifier = Identifier.create(databaseName, tableName);
+            FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
+            response =
+                    new GetTableResponse(
+                            
AbstractCatalog.newTableLocation(catalog.warehouse(), identifier)
+                                    .toString(),
+                            table.schema().id(),
+                            table.schema().toSchema());
+            return mockResponse(response, 200);
+        } else if (request.getMethod().equals("POST")) {
+            Identifier identifier = Identifier.create(databaseName, tableName);
+            AlterTableRequest requestBody =
+                    OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
AlterTableRequest.class);
+            catalog.alterTable(identifier, requestBody.getChanges(), false);
+            FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
+            response = new GetTableResponse("", table.schema().id(), 
table.schema().toSchema());
+            return mockResponse(response, 200);
+        } else if (request.getMethod().equals("DELETE")) {
+            Identifier identifier = Identifier.create(databaseName, tableName);
+            catalog.dropTable(identifier, false);
+            return new MockResponse().setResponseCode(200);
+        }
+        return new MockResponse().setResponseCode(404);
+    }
+
+    private static MockResponse mockResponse(RESTResponse response, int 
httpCode) {
+        try {
+            return new MockResponse()
+                    .setResponseCode(httpCode)
+                    .setBody(OBJECT_MAPPER.writeValueAsString(response))
+                    .addHeader("Content-Type", "application/json");
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static String getConfigBody(String warehouseStr) {
+        return String.format(
+                "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
+                RESTCatalogInternalOptions.PREFIX.key(),
+                PREFIX,
+                CatalogOptions.WAREHOUSE.key(),
+                warehouseStr);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 344807b4c9..b34ca1e5ac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -18,352 +18,107 @@
 
 package org.apache.paimon.rest;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
-import org.apache.paimon.rest.requests.CreateTableRequest;
-import org.apache.paimon.rest.responses.AlterDatabaseResponse;
-import org.apache.paimon.rest.responses.CreateDatabaseResponse;
-import org.apache.paimon.rest.responses.ErrorResponse;
-import org.apache.paimon.rest.responses.GetDatabaseResponse;
-import org.apache.paimon.rest.responses.GetTableResponse;
-import org.apache.paimon.rest.responses.ListDatabasesResponse;
-import org.apache.paimon.rest.responses.ListPartitionsResponse;
-import org.apache.paimon.rest.responses.ListTablesResponse;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.rest.exceptions.NotAuthorizedException;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
 
-import okhttp3.mockwebserver.MockResponse;
-import okhttp3.mockwebserver.MockWebServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Test for REST Catalog. */
-public class RESTCatalogTest {
+class RESTCatalogTest extends CatalogTestBase {
 
-    private final ObjectMapper mapper = RESTObjectMapper.create();
-    private MockWebServer mockWebServer;
-    private RESTCatalog restCatalog;
-    private String warehouseStr;
-    private String serverUrl;
-    @Rule public TemporaryFolder folder = new TemporaryFolder();
+    private RESTCatalogServer restCatalogServer;
 
-    @Before
-    public void setUp() throws IOException {
-        mockWebServer = new MockWebServer();
-        mockWebServer.start();
-        serverUrl = mockWebServer.url("").toString();
-        Options options = mockInitOptions();
-        warehouseStr = folder.getRoot().getPath();
-        mockConfig(warehouseStr);
-        restCatalog = new RESTCatalog(CatalogContext.create(options));
-    }
-
-    @After
-    public void tearDown() throws IOException {
-        mockWebServer.shutdown();
-    }
-
-    @Test
-    public void testInitFailWhenDefineWarehouse() {
+    @BeforeEach
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        String initToken = "init_token";
+        restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+        restCatalogServer.start();
         Options options = new Options();
-        options.set(CatalogOptions.WAREHOUSE, warehouseStr);
-        assertThrows(
-                IllegalArgumentException.class,
-                () -> new RESTCatalog(CatalogContext.create(options)));
-    }
-
-    @Test
-    public void testListDatabases() throws JsonProcessingException {
-        String name = MockRESTMessage.databaseName();
-        ListDatabasesResponse response = 
MockRESTMessage.listDatabasesResponse(name);
-        mockResponse(mapper.writeValueAsString(response), 200);
-        List<String> result = restCatalog.listDatabases();
-        assertEquals(response.getDatabases().size(), result.size());
-        assertEquals(name, result.get(0));
-    }
-
-    @Test
-    public void testCreateDatabase() throws Exception {
-        String name = MockRESTMessage.databaseName();
-        CreateDatabaseResponse response = 
MockRESTMessage.createDatabaseResponse(name);
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertDoesNotThrow(() -> restCatalog.createDatabase(name, false, 
response.getOptions()));
-    }
-
-    @Test
-    public void testGetDatabase() throws Exception {
-        String name = MockRESTMessage.databaseName();
-        GetDatabaseResponse response = 
MockRESTMessage.getDatabaseResponse(name);
-        mockResponse(mapper.writeValueAsString(response), 200);
-        Database result = restCatalog.getDatabase(name);
-        assertEquals(name, result.name());
-        assertEquals(response.getOptions().size(), result.options().size());
-        assertEquals(response.comment().get(), result.comment().get());
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+        this.catalog = new RESTCatalog(CatalogContext.create(options));
     }
 
-    @Test
-    public void testDropDatabase() throws Exception {
-        String name = MockRESTMessage.databaseName();
-        mockResponse("", 200);
-        assertDoesNotThrow(() -> restCatalog.dropDatabase(name, false, true));
+    @AfterEach
+    public void tearDown() throws Exception {
+        restCatalogServer.shutdown();
     }
 
-    @Test
-    public void testDropDatabaseWhenNoExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
-        String name = MockRESTMessage.databaseName();
-        ErrorResponse response = 
MockRESTMessage.noSuchResourceExceptionErrorResponse();
-        mockResponse(mapper.writeValueAsString(response), 404);
-        assertThrows(
-                Catalog.DatabaseNotExistException.class,
-                () -> restCatalog.dropDatabase(name, false, true));
+    @Override
+    protected boolean supportGetFromSystemDatabase() {
+        return false;
     }
 
     @Test
-    public void testDropDatabaseWhenNoExistAndIgnoreIfNotExistsIsTrue() throws 
Exception {
-        String name = MockRESTMessage.databaseName();
-        ErrorResponse response = 
MockRESTMessage.noSuchResourceExceptionErrorResponse();
-        mockResponse(mapper.writeValueAsString(response), 404);
-        assertDoesNotThrow(() -> restCatalog.dropDatabase(name, true, true));
+    void testInitFailWhenDefineWarehouse() {
+        Options options = new Options();
+        options.set(CatalogOptions.WAREHOUSE, warehouse);
+        assertThatThrownBy(() -> new 
RESTCatalog(CatalogContext.create(options)))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws 
Exception {
-        String name = MockRESTMessage.databaseName();
-        boolean cascade = false;
-        ListTablesResponse response = 
MockRESTMessage.listTablesEmptyResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        mockResponse("", 200);
-        assertDoesNotThrow(() -> restCatalog.dropDatabase(name, false, 
cascade));
+    void testAuthFail() {
+        Options options = new Options();
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTCatalogOptions.TOKEN, "aaaaa");
+        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+        options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER);
+        assertThatThrownBy(() -> new 
RESTCatalog(CatalogContext.create(options)))
+                .isInstanceOf(NotAuthorizedException.class);
     }
 
     @Test
-    public void testDropDatabaseWhenCascadeIsFalseAndTablesExist() throws 
Exception {
-        String name = MockRESTMessage.databaseName();
-        boolean cascade = false;
-        ListTablesResponse response = MockRESTMessage.listTablesResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertThrows(
-                Catalog.DatabaseNotEmptyException.class,
-                () -> restCatalog.dropDatabase(name, false, cascade));
+    void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception {
+        Identifier identifier = Identifier.create("test_db", "test_table");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        List<Partition> result = catalog.listPartitions(identifier);
+        assertEquals(0, result.size());
     }
 
     @Test
-    public void testAlterDatabase() throws Exception {
-        String name = MockRESTMessage.databaseName();
-        AlterDatabaseResponse response = 
MockRESTMessage.alterDatabaseResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertDoesNotThrow(() -> restCatalog.alterDatabase(name, new 
ArrayList<>(), true));
+    void testListPartitionsFromFile() throws Exception {
+        Identifier identifier = Identifier.create("test_db", "test_table");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        List<Partition> result = catalog.listPartitions(identifier);
+        assertEquals(0, result.size());
     }
 
-    @Test
-    public void 
testAlterDatabaseWhenDatabaseNotExistAndIgnoreIfNotExistsIsFalse()
+    private void createTable(
+            Identifier identifier, Map<String, String> options, List<String> 
partitionKeys)
             throws Exception {
-        String name = MockRESTMessage.databaseName();
-        ErrorResponse response = 
MockRESTMessage.noSuchResourceExceptionErrorResponse();
-        mockResponse(mapper.writeValueAsString(response), 404);
-        assertThrows(
-                Catalog.DatabaseNotExistException.class,
-                () -> restCatalog.alterDatabase(name, new ArrayList<>(), 
false));
-    }
-
-    @Test
-    public void 
testAlterDatabaseWhenDatabaseNotExistAndIgnoreIfNotExistsIsTrue() throws 
Exception {
-        String name = MockRESTMessage.databaseName();
-        ErrorResponse response = 
MockRESTMessage.noSuchResourceExceptionErrorResponse();
-        mockResponse(mapper.writeValueAsString(response), 404);
-        assertDoesNotThrow(() -> restCatalog.alterDatabase(name, new 
ArrayList<>(), true));
-    }
-
-    @Test
-    public void testListTables() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        ListTablesResponse response = MockRESTMessage.listTablesResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        List<String> result = restCatalog.listTables(databaseName);
-        assertEquals(response.getTables().size(), result.size());
-    }
-
-    @Test
-    public void testGetTable() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        Table result = restCatalog.getTable(Identifier.create(databaseName, 
"table"));
-        assertEquals(response.getSchema().options().size() + 1, 
result.options().size());
-    }
-
-    @Test
-    public void testCreateTable() throws Exception {
-        CreateTableRequest request = 
MockRESTMessage.createTableRequest("table");
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertDoesNotThrow(
-                () -> restCatalog.createTable(request.getIdentifier(), 
request.getSchema(), false));
-    }
-
-    @Test
-    public void testCreateTableWhenTableAlreadyExistAndIgnoreIfExistsIsFalse() 
throws Exception {
-        CreateTableRequest request = 
MockRESTMessage.createTableRequest("table");
-        mockResponse("", 409);
-        assertThrows(
-                Catalog.TableAlreadyExistException.class,
-                () -> restCatalog.createTable(request.getIdentifier(), 
request.getSchema(), false));
-    }
-
-    @Test
-    public void testRenameTable() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        String fromTableName = "fromTable";
-        String toTableName = "toTable";
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertDoesNotThrow(
-                () ->
-                        restCatalog.renameTable(
-                                Identifier.create(databaseName, fromTableName),
-                                Identifier.create(databaseName, toTableName),
-                                true));
-    }
-
-    @Test
-    public void testRenameTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        String fromTableName = "fromTable";
-        String toTableName = "toTable";
-        mockResponse("", 404);
-        assertThrows(
-                Catalog.TableNotExistException.class,
-                () ->
-                        restCatalog.renameTable(
-                                Identifier.create(databaseName, fromTableName),
-                                Identifier.create(databaseName, toTableName),
-                                false));
-    }
-
-    @Test
-    public void testRenameTableWhenToTableAlreadyExist() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        String fromTableName = "fromTable";
-        String toTableName = "toTable";
-        mockResponse("", 409);
-        assertThrows(
-                Catalog.TableAlreadyExistException.class,
-                () ->
-                        restCatalog.renameTable(
-                                Identifier.create(databaseName, fromTableName),
-                                Identifier.create(databaseName, toTableName),
-                                false));
-    }
-
-    @Test
-    public void testAlterTable() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        List<SchemaChange> changes = MockRESTMessage.getChanges();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertDoesNotThrow(
-                () -> restCatalog.alterTable(Identifier.create(databaseName, 
"t1"), changes, true));
-    }
-
-    @Test
-    public void testAlterTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        List<SchemaChange> changes = MockRESTMessage.getChanges();
-        mockResponse("", 404);
-        assertThrows(
-                Catalog.TableNotExistException.class,
-                () ->
-                        restCatalog.alterTable(
-                                Identifier.create(databaseName, "t1"), 
changes, false));
-    }
-
-    @Test
-    public void testDropTable() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        String tableName = "table";
-        mockResponse("", 200);
-        assertDoesNotThrow(
-                () -> restCatalog.dropTable(Identifier.create(databaseName, 
tableName), true));
-    }
-
-    @Test
-    public void testDropTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        String tableName = "table";
-        mockResponse("", 404);
-        assertThrows(
-                Catalog.TableNotExistException.class,
-                () -> restCatalog.dropTable(Identifier.create(databaseName, 
tableName), false));
-    }
-
-    @Test
-    public void testListPartitionsWhenMetastorePartitionedIsTrue() throws 
Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        GetTableResponse getTableResponse = 
MockRESTMessage.getTableResponseEnablePartition();
-        mockResponse(mapper.writeValueAsString(getTableResponse), 200);
-        ListPartitionsResponse response = 
MockRESTMessage.listPartitionsResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        List<Partition> result =
-                restCatalog.listPartitions(Identifier.create(databaseName, 
"table"));
-        assertEquals(response.getPartitions().size(), result.size());
-    }
-
-    @Test
-    public void testListPartitionsFromFile() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        GetTableResponse response = 
MockRESTMessage.getTableResponseEnablePartition();
-        mockResponse(mapper.writeValueAsString(response), 200);
-        mockResponse(mapper.writeValueAsString(response), 200);
-        List<Partition> partitionEntries =
-                restCatalog.listPartitions(Identifier.create(databaseName, 
"table"));
-        assertEquals(partitionEntries.size(), 0);
-    }
-
-    private void mockResponse(String mockResponse, int httpCode) {
-        MockResponse mockResponseObj =
-                new MockResponse()
-                        .setResponseCode(httpCode)
-                        .setBody(mockResponse)
-                        .addHeader("Content-Type", "application/json");
-        mockWebServer.enqueue(mockResponseObj);
-    }
-
-    private void mockConfig(String warehouseStr) {
-        String mockResponse =
-                String.format(
-                        "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
-                        RESTCatalogInternalOptions.PREFIX.key(),
-                        "prefix",
-                        CatalogOptions.WAREHOUSE.key(),
-                        warehouseStr);
-        mockResponse(mockResponse, 200);
-    }
-
-    public Options mockInitOptions() {
-        Options options = new Options();
-        options.set(RESTCatalogOptions.URI, serverUrl);
-        String initToken = "init_token";
-        options.set(RESTCatalogOptions.TOKEN, initToken);
-        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
-        return options;
+        catalog.createDatabase(identifier.getDatabaseName(), false);
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(new DataField(0, "col1", 
DataTypes.STRING())),
+                        partitionKeys,
+                        Collections.emptyList(),
+                        options,
+                        ""),
+                true);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 38a6e08751..354efe69d5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -53,7 +53,8 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Test for {@link RESTObjectMapper}. */
 public class RESTObjectMapperTest {
-    private ObjectMapper mapper = RESTObjectMapper.create();
+
+    private static final ObjectMapper OBJECT_MAPPER = 
RESTObjectMapper.create();
 
     @Test
     public void configResponseParseTest() throws Exception {
@@ -61,8 +62,8 @@ public class RESTObjectMapperTest {
         Map<String, String> conf = new HashMap<>();
         conf.put(confKey, "b");
         ConfigResponse response = new ConfigResponse(conf, conf);
-        String responseStr = mapper.writeValueAsString(response);
-        ConfigResponse parseData = mapper.readValue(responseStr, 
ConfigResponse.class);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
+        ConfigResponse parseData = OBJECT_MAPPER.readValue(responseStr, 
ConfigResponse.class);
         assertEquals(conf.get(confKey), parseData.getDefaults().get(confKey));
     }
 
@@ -70,9 +71,10 @@ public class RESTObjectMapperTest {
     public void errorResponseParseTest() throws Exception {
         String message = "message";
         Integer code = 400;
-        ErrorResponse response = new ErrorResponse(message, code, new 
ArrayList<String>());
-        String responseStr = mapper.writeValueAsString(response);
-        ErrorResponse parseData = mapper.readValue(responseStr, 
ErrorResponse.class);
+        ErrorResponse response =
+                new ErrorResponse(null, null, message, code, new 
ArrayList<String>());
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
+        ErrorResponse parseData = OBJECT_MAPPER.readValue(responseStr, 
ErrorResponse.class);
         assertEquals(message, parseData.getMessage());
         assertEquals(code, parseData.getCode());
     }
@@ -81,8 +83,9 @@ public class RESTObjectMapperTest {
     public void createDatabaseRequestParseTest() throws Exception {
         String name = MockRESTMessage.databaseName();
         CreateDatabaseRequest request = 
MockRESTMessage.createDatabaseRequest(name);
-        String requestStr = mapper.writeValueAsString(request);
-        CreateDatabaseRequest parseData = mapper.readValue(requestStr, 
CreateDatabaseRequest.class);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        CreateDatabaseRequest parseData =
+                OBJECT_MAPPER.readValue(requestStr, 
CreateDatabaseRequest.class);
         assertEquals(request.getName(), parseData.getName());
         assertEquals(request.getOptions().size(), 
parseData.getOptions().size());
     }
@@ -91,9 +94,9 @@ public class RESTObjectMapperTest {
     public void createDatabaseResponseParseTest() throws Exception {
         String name = MockRESTMessage.databaseName();
         CreateDatabaseResponse response = 
MockRESTMessage.createDatabaseResponse(name);
-        String responseStr = mapper.writeValueAsString(response);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
         CreateDatabaseResponse parseData =
-                mapper.readValue(responseStr, CreateDatabaseResponse.class);
+                OBJECT_MAPPER.readValue(responseStr, 
CreateDatabaseResponse.class);
         assertEquals(name, parseData.getName());
         assertEquals(response.getOptions().size(), 
parseData.getOptions().size());
     }
@@ -102,8 +105,9 @@ public class RESTObjectMapperTest {
     public void getDatabaseResponseParseTest() throws Exception {
         String name = MockRESTMessage.databaseName();
         GetDatabaseResponse response = 
MockRESTMessage.getDatabaseResponse(name);
-        String responseStr = mapper.writeValueAsString(response);
-        GetDatabaseResponse parseData = mapper.readValue(responseStr, 
GetDatabaseResponse.class);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
+        GetDatabaseResponse parseData =
+                OBJECT_MAPPER.readValue(responseStr, 
GetDatabaseResponse.class);
         assertEquals(name, parseData.getName());
         assertEquals(response.getOptions().size(), 
parseData.getOptions().size());
         assertEquals(response.comment().get(), parseData.comment().get());
@@ -113,9 +117,9 @@ public class RESTObjectMapperTest {
     public void listDatabaseResponseParseTest() throws Exception {
         String name = MockRESTMessage.databaseName();
         ListDatabasesResponse response = 
MockRESTMessage.listDatabasesResponse(name);
-        String responseStr = mapper.writeValueAsString(response);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
         ListDatabasesResponse parseData =
-                mapper.readValue(responseStr, ListDatabasesResponse.class);
+                OBJECT_MAPPER.readValue(responseStr, 
ListDatabasesResponse.class);
         assertEquals(response.getDatabases().size(), 
parseData.getDatabases().size());
         assertEquals(name, parseData.getDatabases().get(0));
     }
@@ -123,8 +127,9 @@ public class RESTObjectMapperTest {
     @Test
     public void alterDatabaseRequestParseTest() throws Exception {
         AlterDatabaseRequest request = MockRESTMessage.alterDatabaseRequest();
-        String requestStr = mapper.writeValueAsString(request);
-        AlterDatabaseRequest parseData = mapper.readValue(requestStr, 
AlterDatabaseRequest.class);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        AlterDatabaseRequest parseData =
+                OBJECT_MAPPER.readValue(requestStr, 
AlterDatabaseRequest.class);
         assertEquals(request.getRemovals().size(), 
parseData.getRemovals().size());
         assertEquals(request.getUpdates().size(), 
parseData.getUpdates().size());
     }
@@ -132,9 +137,9 @@ public class RESTObjectMapperTest {
     @Test
     public void alterDatabaseResponseParseTest() throws Exception {
         AlterDatabaseResponse response = 
MockRESTMessage.alterDatabaseResponse();
-        String responseStr = mapper.writeValueAsString(response);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
         AlterDatabaseResponse parseData =
-                mapper.readValue(responseStr, AlterDatabaseResponse.class);
+                OBJECT_MAPPER.readValue(responseStr, 
AlterDatabaseResponse.class);
         assertEquals(response.getRemoved().size(), 
parseData.getRemoved().size());
         assertEquals(response.getUpdated().size(), 
parseData.getUpdated().size());
         assertEquals(response.getMissing().size(), 
parseData.getMissing().size());
@@ -143,8 +148,9 @@ public class RESTObjectMapperTest {
     @Test
     public void createTableRequestParseTest() throws Exception {
         CreateTableRequest request = MockRESTMessage.createTableRequest("t1");
-        String requestStr = mapper.writeValueAsString(request);
-        CreateTableRequest parseData = mapper.readValue(requestStr, 
CreateTableRequest.class);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        CreateTableRequest parseData =
+                OBJECT_MAPPER.readValue(requestStr, CreateTableRequest.class);
         assertEquals(request.getIdentifier(), parseData.getIdentifier());
         assertEquals(request.getSchema(), parseData.getSchema());
     }
@@ -160,7 +166,7 @@ public class RESTObjectMapperTest {
                 String.format(
                         "{\"id\": %d,\"name\":\"%s\",\"type\":\"%s\", 
\"description\":\"%s\"}",
                         id, name, type, descStr);
-        DataField parseData = mapper.readValue(dataFieldStr, DataField.class);
+        DataField parseData = OBJECT_MAPPER.readValue(dataFieldStr, 
DataField.class);
         assertEquals(id, parseData.id());
         assertEquals(name, parseData.name());
         assertEquals(type, parseData.type());
@@ -170,16 +176,17 @@ public class RESTObjectMapperTest {
     @Test
     public void renameTableRequestParseTest() throws Exception {
         RenameTableRequest request = MockRESTMessage.renameRequest("t2");
-        String requestStr = mapper.writeValueAsString(request);
-        RenameTableRequest parseData = mapper.readValue(requestStr, 
RenameTableRequest.class);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        RenameTableRequest parseData =
+                OBJECT_MAPPER.readValue(requestStr, RenameTableRequest.class);
         assertEquals(request.getNewIdentifier(), parseData.getNewIdentifier());
     }
 
     @Test
     public void getTableResponseParseTest() throws Exception {
         GetTableResponse response = MockRESTMessage.getTableResponse();
-        String responseStr = mapper.writeValueAsString(response);
-        GetTableResponse parseData = mapper.readValue(responseStr, 
GetTableResponse.class);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
+        GetTableResponse parseData = OBJECT_MAPPER.readValue(responseStr, 
GetTableResponse.class);
         assertEquals(response.getSchemaId(), parseData.getSchemaId());
         assertEquals(response.getSchema(), parseData.getSchema());
     }
@@ -187,25 +194,26 @@ public class RESTObjectMapperTest {
     @Test
     public void listTablesResponseParseTest() throws Exception {
         ListTablesResponse response = MockRESTMessage.listTablesResponse();
-        String responseStr = mapper.writeValueAsString(response);
-        ListTablesResponse parseData = mapper.readValue(responseStr, 
ListTablesResponse.class);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
+        ListTablesResponse parseData =
+                OBJECT_MAPPER.readValue(responseStr, ListTablesResponse.class);
         assertEquals(response.getTables(), parseData.getTables());
     }
 
     @Test
     public void alterTableRequestParseTest() throws Exception {
         AlterTableRequest request = MockRESTMessage.alterTableRequest();
-        String requestStr = mapper.writeValueAsString(request);
-        AlterTableRequest parseData = mapper.readValue(requestStr, 
AlterTableRequest.class);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        AlterTableRequest parseData = OBJECT_MAPPER.readValue(requestStr, 
AlterTableRequest.class);
         assertEquals(parseData.getChanges().size(), 
parseData.getChanges().size());
     }
 
     @Test
     public void createPartitionRequestParseTest() throws 
JsonProcessingException {
         CreatePartitionRequest request = 
MockRESTMessage.createPartitionRequest("t1");
-        String requestStr = mapper.writeValueAsString(request);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
         CreatePartitionRequest parseData =
-                mapper.readValue(requestStr, CreatePartitionRequest.class);
+                OBJECT_MAPPER.readValue(requestStr, 
CreatePartitionRequest.class);
         assertEquals(parseData.getIdentifier(), parseData.getIdentifier());
         assertEquals(parseData.getPartitionSpec().size(), 
parseData.getPartitionSpec().size());
     }
@@ -213,17 +221,18 @@ public class RESTObjectMapperTest {
     @Test
     public void dropPartitionRequestParseTest() throws JsonProcessingException 
{
         DropPartitionRequest request = MockRESTMessage.dropPartitionRequest();
-        String requestStr = mapper.writeValueAsString(request);
-        DropPartitionRequest parseData = mapper.readValue(requestStr, 
DropPartitionRequest.class);
+        String requestStr = OBJECT_MAPPER.writeValueAsString(request);
+        DropPartitionRequest parseData =
+                OBJECT_MAPPER.readValue(requestStr, 
DropPartitionRequest.class);
         assertEquals(parseData.getPartitionSpec().size(), 
parseData.getPartitionSpec().size());
     }
 
     @Test
     public void listPartitionsResponseParseTest() throws Exception {
         ListPartitionsResponse response = 
MockRESTMessage.listPartitionsResponse();
-        String responseStr = mapper.writeValueAsString(response);
+        String responseStr = OBJECT_MAPPER.writeValueAsString(response);
         ListPartitionsResponse parseData =
-                mapper.readValue(responseStr, ListPartitionsResponse.class);
+                OBJECT_MAPPER.readValue(responseStr, 
ListPartitionsResponse.class);
         assertEquals(
                 response.getPartitions().get(0).fileCount(),
                 parseData.getPartitions().get(0).fileCount());
@@ -232,10 +241,11 @@ public class RESTObjectMapperTest {
     @Test
     public void partitionResponseParseTest() throws Exception {
         PartitionResponse response = MockRESTMessage.partitionResponse();
-        assertDoesNotThrow(() -> mapper.writeValueAsString(response));
+        assertDoesNotThrow(() -> OBJECT_MAPPER.writeValueAsString(response));
         assertDoesNotThrow(
                 () ->
-                        mapper.readValue(
-                                mapper.writeValueAsString(response), 
PartitionResponse.class));
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(response),
+                                PartitionResponse.class));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
index 1f4a48fd5e..fec7492082 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
@@ -47,6 +47,23 @@ public class AuthSessionTest {
 
     @Rule public TemporaryFolder folder = new TemporaryFolder();
 
+    @Test
+    public void testBearToken() {
+        String token = UUID.randomUUID().toString();
+        Map<String, String> initialHeaders = new HashMap<>();
+        initialHeaders.put("k1", "v1");
+        initialHeaders.put("k2", "v2");
+        CredentialsProvider credentialsProvider = new 
BearTokenCredentialsProvider(token);
+        AuthSession session = new AuthSession(initialHeaders, 
credentialsProvider);
+        Map<String, String> header = session.getHeaders();
+        assertEquals(header.get("Authorization"), "Bearer " + token);
+        assertEquals(header.get("k1"), "v1");
+        for (Map.Entry<String, String> entry : initialHeaders.entrySet()) {
+            assertEquals(entry.getValue(), header.get(entry.getKey()));
+        }
+        assertEquals(header.size(), initialHeaders.size() + 1);
+    }
+
     @Test
     public void testRefreshBearTokenFileCredentialsProvider()
             throws IOException, InterruptedException {
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index e0f7ce245f..84d4622b02 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -177,6 +177,13 @@ under the License.
             <type>jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index 19aa6d5d74..01d615c3e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -75,7 +75,9 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
 
         Map<String, String> options = new HashMap<>(catalogOptions());
         options.put("type", "paimon");
-        options.put("warehouse", toWarehouse(path));
+        if (supportDefineWarehouse()) {
+            options.put("warehouse", toWarehouse(path));
+        }
         tEnv.executeSql(
                 String.format(
                         "CREATE CATALOG %s WITH (" + "%s" + inferScan + ")",
@@ -97,6 +99,10 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
         return Collections.emptyMap();
     }
 
+    protected boolean supportDefineWarehouse() {
+        return true;
+    }
+
     protected boolean inferScanParallelism() {
         return false;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
new file mode 100644
index 0000000000..c310de32fd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for REST catalog. */
+class RESTCatalogITCase extends CatalogITCaseBase {
+
+    private static final String DATABASE_NAME = "mydb";
+    private static final String TABLE_NAME = "t1";
+
+    private RESTCatalogServer restCatalogServer;
+    private String serverUrl;
+    private String warehouse;
+    @TempDir java.nio.file.Path tempFile;
+
+    @BeforeEach
+    @Override
+    public void before() throws IOException {
+        String initToken = "init_token";
+        warehouse = tempFile.toUri().toString();
+        restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+        restCatalogServer.start();
+        serverUrl = restCatalogServer.getUrl();
+        super.before();
+        sql(String.format("CREATE DATABASE %s", DATABASE_NAME));
+        sql(String.format("CREATE TABLE %s.%s (a STRING, b DOUBLE)", 
DATABASE_NAME, TABLE_NAME));
+    }
+
+    @AfterEach()
+    public void after() throws IOException {
+        sql(String.format("DROP TABLE  %s.%s", DATABASE_NAME, TABLE_NAME));
+        sql(String.format("DROP DATABASE %s", DATABASE_NAME));
+        restCatalogServer.shutdown();
+    }
+
+    @Test
+    void testCreateTable() {
+        List<Row> result = sql(String.format("SHOW CREATE TABLE %s.%s", 
DATABASE_NAME, TABLE_NAME));
+        assertThat(result.toString())
+                .contains(
+                        String.format(
+                                "CREATE TABLE `PAIMON`.`%s`.`%s` (\n"
+                                        + "  `a` VARCHAR(2147483647),\n"
+                                        + "  `b` DOUBLE",
+                                DATABASE_NAME, TABLE_NAME));
+    }
+
+    @Test
+    void testAlterTable() {
+        sql(String.format("ALTER TABLE %s.%s ADD e INT AFTER b", 
DATABASE_NAME, TABLE_NAME));
+        sql(String.format("ALTER TABLE %s.%s DROP b", DATABASE_NAME, 
TABLE_NAME));
+        sql(String.format("ALTER TABLE %s.%s RENAME a TO a1", DATABASE_NAME, 
TABLE_NAME));
+        sql(String.format("ALTER TABLE %s.%s MODIFY e DOUBLE", DATABASE_NAME, 
TABLE_NAME));
+        List<Row> result = sql(String.format("SHOW CREATE TABLE %s.%s", 
DATABASE_NAME, TABLE_NAME));
+        assertThat(result.toString())
+                .contains(
+                        String.format(
+                                "CREATE TABLE `PAIMON`.`%s`.`%s` (\n"
+                                        + "  `a1` VARCHAR(2147483647),\n"
+                                        + "  `e` DOUBLE",
+                                DATABASE_NAME, TABLE_NAME));
+    }
+
+    @Override
+    protected Map<String, String> catalogOptions() {
+        String initToken = "init_token";
+        Map<String, String> options = new HashMap<>();
+        options.put("metastore", "rest");
+        options.put(RESTCatalogOptions.URI.key(), serverUrl);
+        options.put(RESTCatalogOptions.TOKEN.key(), initToken);
+        options.put(RESTCatalogOptions.THREAD_POOL_SIZE.key(), "" + 1);
+        return options;
+    }
+
+    @Override
+    protected String getTempDirPath() {
+        return this.warehouse;
+    }
+
+    @Override
+    protected boolean supportDefineWarehouse() {
+        return false;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index d96fac808c..e185e5acbf 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -172,11 +172,6 @@ public class HiveCatalogTest extends CatalogTestBase {
         assertThat(hiveConf.get("hive.metastore.uris")).isEqualTo("dummy-hms");
     }
 
-    @Test
-    public void testAlterDatabase() throws Exception {
-        this.alterDatabaseWhenSupportAlter();
-    }
-
     @Test
     public void testAddHiveTableParameters() {
         try {
@@ -503,4 +498,9 @@ public class HiveCatalogTest extends CatalogTestBase {
         // hive catalog list partitions from filesystem, so here return empty.
         assertThat(catalog.listPartitions(identifier)).isEmpty();
     }
+
+    @Override
+    protected boolean supportsAlterDatabase() {
+        return true;
+    }
 }
diff --git a/pom.xml b/pom.xml
index 6600f04090..d55694cec7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,7 @@ under the License.
         <zstd-jni.version>1.5.5-11</zstd-jni.version>
         <janino.version>3.0.11</janino.version>
         <mockito.version>3.4.6</mockito.version>
+        <okhttp.version>4.12.0</okhttp.version>
         <jaxb.api.version>2.3.1</jaxb.api.version>
         <findbugs.version>1.3.9</findbugs.version>
         <json-smart.version>2.4.9</json-smart.version>

Reply via email to