This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 75e9da48ed [core] Support in memory catalog for test RESTCatalog 
(#5154)
75e9da48ed is described below

commit 75e9da48ed32586f5c930f96b23dc118cd214df9
Author: jerry <[email protected]>
AuthorDate: Wed Feb 26 17:56:55 2025 +0800

    [core] Support in memory catalog for test RESTCatalog (#5154)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |   2 +-
 .../apache/paimon/catalog/FileSystemCatalog.java   |   3 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |   6 +-
 .../rest/MetadataInMemoryFileSystemCatalog.java    | 443 +++++++++++++++++++++
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  74 +++-
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  18 +-
 .../org/apache/paimon/rest/TestRESTCatalog.java    | 264 ------------
 7 files changed, 526 insertions(+), 284 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 3df3f0d562..67d6b23187 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -216,7 +216,7 @@ public abstract class AbstractCatalog implements Catalog {
             if (changes == null || changes.isEmpty()) {
                 return;
             }
-            alterDatabaseImpl(name, changes);
+            this.alterDatabaseImpl(name, changes);
         } catch (DatabaseNotExistException e) {
             if (ignoreIfNotExists) {
                 return;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index c1cc5bc230..366051fad5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -94,7 +94,8 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterDatabaseImpl(String name, List<PropertyChange> 
changes) {
+    protected void alterDatabaseImpl(String name, List<PropertyChange> changes)
+            throws DatabaseNotExistException {
         throw new UnsupportedOperationException("Alter database is not 
supported.");
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 9f373bd10a..141b22b45d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -241,9 +241,9 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots {
                             request,
                             AlterDatabaseResponse.class,
                             restAuthFunction);
-            if (response.getUpdated().isEmpty()) {
-                throw new IllegalStateException("Failed to update properties");
-            }
+            //            if (response.getUpdated().isEmpty()) {
+            //                throw new IllegalStateException("Failed to 
update properties");
+            //            }
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 throw new DatabaseNotExistException(name);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
new file mode 100644
index 0000000000..e0c5ab9446
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TableType;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.SupportsSnapshots;
+import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.view.View;
+
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.PATH;
+
+/** A catalog for testing RESTCatalog. */
+public class MetadataInMemoryFileSystemCatalog extends FileSystemCatalog
+        implements SupportsSnapshots {
+
+    public final Map<String, Database> databaseStore;
+    public final Map<String, TableMetadata> tableMetadataStore;
+    public final Map<String, List<Partition>> tablePartitionsStore;
+    public final Map<String, View> viewStore;
+    public final Map<String, Snapshot> tableSnapshotStore;
+    public final Map<String, RESTToken> dataTokenStore;
+    public FileSystemCatalog fileSystemCatalog;
+
+    public MetadataInMemoryFileSystemCatalog(
+            FileIO fileIO,
+            Path warehouse,
+            Options options,
+            Map<String, Database> databaseStore,
+            Map<String, TableMetadata> tableMetadataStore,
+            Map<String, Snapshot> tableSnapshotStore,
+            Map<String, List<Partition>> tablePartitionsStore,
+            Map<String, View> viewStore,
+            Map<String, RESTToken> dataTokenStore) {
+        super(fileIO, warehouse, options);
+        this.fileSystemCatalog = new FileSystemCatalog(fileIO, warehouse, 
options);
+        this.databaseStore = databaseStore;
+        this.tableMetadataStore = tableMetadataStore;
+        this.tablePartitionsStore = tablePartitionsStore;
+        this.tableSnapshotStore = tableSnapshotStore;
+        this.viewStore = viewStore;
+        this.dataTokenStore = dataTokenStore;
+    }
+
+    public static MetadataInMemoryFileSystemCatalog create(
+            CatalogContext context,
+            Map<String, Database> databaseStore,
+            Map<String, TableMetadata> tableMetadataStore,
+            Map<String, Snapshot> tableSnapshotStore,
+            Map<String, List<Partition>> tablePartitionsStore,
+            Map<String, View> viewStore,
+            Map<String, RESTToken> dataTokenStore) {
+        String warehouse = 
CatalogFactory.warehouse(context).toUri().toString();
+
+        Path warehousePath = new Path(warehouse);
+        FileIO fileIO;
+
+        try {
+            fileIO = FileIO.get(warehousePath, context);
+            fileIO.checkOrMkdirs(warehousePath);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return new MetadataInMemoryFileSystemCatalog(
+                fileIO,
+                warehousePath,
+                context.options(),
+                databaseStore,
+                tableMetadataStore,
+                tableSnapshotStore,
+                tablePartitionsStore,
+                viewStore,
+                dataTokenStore);
+    }
+
+    @Override
+    public List<String> listDatabases() {
+        return new ArrayList<>(databaseStore.keySet());
+    }
+
+    @Override
+    protected void createDatabaseImpl(String name, Map<String, String> 
properties) {
+        super.createDatabaseImpl(name, properties);
+        databaseStore.put(name, Database.of(name, properties, null));
+    }
+
+    @Override
+    public Database getDatabaseImpl(String name) throws 
DatabaseNotExistException {
+        if (databaseStore.containsKey(name)) {
+            return databaseStore.get(name);
+        }
+        throw new DatabaseNotExistException(name);
+    }
+
+    @Override
+    protected void dropDatabaseImpl(String name) {
+        super.dropDatabaseImpl(name);
+        databaseStore.remove(name);
+    }
+
+    protected void alterDatabaseImpl(String name, List<PropertyChange> changes)
+            throws DatabaseNotExistException {
+        if (databaseStore.containsKey(name)) {
+            Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
+                    PropertyChange.getSetPropertiesToRemoveKeys(changes);
+            Map<String, String> setProperties = 
setPropertiesToRemoveKeys.getLeft();
+            Set<String> removeKeys = setPropertiesToRemoveKeys.getRight();
+            Database database = databaseStore.get(name);
+            Map<String, String> parameter = new HashMap<>(database.options());
+            if (!setProperties.isEmpty()) {
+                parameter.putAll(setProperties);
+            }
+            if (!removeKeys.isEmpty()) {
+                parameter.keySet().removeAll(removeKeys);
+            }
+            Database alterDatabase = Database.of(name, parameter, null);
+            databaseStore.put(name, alterDatabase);
+        } else {
+            throw new DatabaseNotExistException(name);
+        }
+    }
+
+    @Override
+    protected List<String> listTablesImpl(String databaseName) {
+        List<String> tables = new ArrayList<>();
+        for (Map.Entry<String, TableMetadata> entry : 
tableMetadataStore.entrySet()) {
+            Identifier identifier = Identifier.fromString(entry.getKey());
+            if (databaseName.equals(identifier.getDatabaseName())) {
+                tables.add(identifier.getTableName());
+            }
+        }
+        return tables;
+    }
+
+    @Override
+    public void createTableImpl(Identifier identifier, Schema schema) {
+        super.createTableImpl(identifier, schema);
+        try {
+            TableMetadata tableMetadata =
+                    createTableMetadata(
+                            identifier, 1L, schema, 
UUID.randomUUID().toString(), false);
+            tableMetadataStore.put(identifier.getFullName(), tableMetadata);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private TableMetadata createTableMetadata(
+            Identifier identifier, long schemaId, Schema schema, String uuid, 
boolean isExternal) {
+        Map<String, String> options = new HashMap<>(schema.options());
+        Path path = getTableLocation(identifier);
+        options.put(PATH.key(), path.toString());
+        TableSchema tableSchema =
+                new TableSchema(
+                        schemaId,
+                        schema.fields(),
+                        schema.fields().size() - 1,
+                        schema.partitionKeys(),
+                        schema.primaryKeys(),
+                        options,
+                        schema.comment());
+        TableMetadata tableMetadata = new TableMetadata(tableSchema, 
isExternal, uuid);
+        return tableMetadata;
+    }
+
+    @Override
+    protected void dropTableImpl(Identifier identifier) {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            tableMetadataStore.remove(identifier.getFullName());
+            super.dropTableImpl(identifier);
+        }
+    }
+
+    @Override
+    public void renameTableImpl(Identifier fromTable, Identifier toTable) {
+        if (tableMetadataStore.containsKey(fromTable.getFullName())) {
+            super.renameTableImpl(fromTable, toTable);
+            TableMetadata tableMetadata = 
tableMetadataStore.get(fromTable.getFullName());
+            tableMetadataStore.remove(fromTable.getFullName());
+            tableMetadataStore.put(toTable.getFullName(), tableMetadata);
+        }
+    }
+
+    @Override
+    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+            TableSchema schema = tableMetadata.schema();
+            Options options = Options.fromMap(schema.options());
+            if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
+                throw new UnsupportedOperationException("Only data table 
support alter table.");
+            }
+            SchemaManager schemaManager = schemaManager(identifier);
+            try {
+                TableSchema newSchema =
+                        runWithLock(identifier, () -> 
schemaManager.commitChanges(changes));
+                TableMetadata newTableMetadata =
+                        createTableMetadata(
+                                identifier,
+                                newSchema.id(),
+                                newSchema.toSchema(),
+                                tableMetadata.uuid(),
+                                tableMetadata.isExternal());
+                tableMetadataStore.put(identifier.getFullName(), 
newTableMetadata);
+            } catch (TableNotExistException
+                    | ColumnAlreadyExistException
+                    | ColumnNotExistException
+                    | RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private SchemaManager schemaManager(Identifier identifier) {
+        Path path = getTableLocation(identifier);
+        return new SchemaManager(fileIO, path, 
identifier.getBranchNameOrDefault());
+    }
+
+    @Override
+    public void createFormatTable(Identifier identifier, Schema schema) {
+        TableMetadata tableMetadata =
+                createTableMetadata(identifier, 1L, schema, 
UUID.randomUUID().toString(), true);
+        tableMetadataStore.put(identifier.getFullName(), tableMetadata);
+    }
+
+    @Override
+    public TableSchema loadTableSchema(Identifier identifier) throws 
TableNotExistException {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            return tableMetadataStore.get(identifier.getFullName()).schema();
+        }
+        throw new TableNotExistException(identifier);
+    }
+
+    @Override
+    protected TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            return tableMetadataStore.get(identifier.getFullName());
+        }
+        throw new TableNotExistException(identifier);
+    }
+
+    @Override
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        getTable(identifier);
+        tablePartitionsStore.put(
+                identifier.getFullName(),
+                partitions.stream()
+                        .map(partition -> spec2Partition(partition))
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        getTable(identifier);
+        List<Partition> existPartitions = 
tablePartitionsStore.get(identifier.getFullName());
+        partitions.forEach(
+                partition -> {
+                    for (Map.Entry<String, String> entry : 
partition.entrySet()) {
+                        existPartitions.stream()
+                                .filter(
+                                        p ->
+                                                
p.spec().containsKey(entry.getKey())
+                                                        && p.spec()
+                                                                
.get(entry.getKey())
+                                                                
.equals(entry.getValue()))
+                                .findFirst()
+                                .ifPresent(
+                                        existPartition -> 
existPartitions.remove(existPartition));
+                    }
+                });
+    }
+
+    @Override
+    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
+            throws TableNotExistException {
+        getTable(identifier);
+        List<Partition> existPartitions = 
tablePartitionsStore.get(identifier.getFullName());
+        partitions.forEach(
+                partition -> {
+                    for (Map.Entry<String, String> entry : 
partition.spec().entrySet()) {
+                        existPartitions.stream()
+                                .filter(
+                                        p ->
+                                                
p.spec().containsKey(entry.getKey())
+                                                        && p.spec()
+                                                                
.get(entry.getKey())
+                                                                
.equals(entry.getValue()))
+                                .findFirst()
+                                .ifPresent(
+                                        existPartition -> 
existPartitions.remove(existPartition));
+                    }
+                });
+        existPartitions.addAll(partitions);
+        tablePartitionsStore.put(identifier.getFullName(), existPartitions);
+    }
+
+    @Override
+    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
+        getTable(identifier);
+        return tablePartitionsStore.get(identifier.getFullName());
+    }
+
+    @Override
+    public View getView(Identifier identifier) throws ViewNotExistException {
+        if (viewStore.containsKey(identifier.getFullName())) {
+            return viewStore.get(identifier.getFullName());
+        }
+        throw new ViewNotExistException(identifier);
+    }
+
+    @Override
+    public void dropView(Identifier identifier, boolean ignoreIfNotExists)
+            throws ViewNotExistException {
+        if (viewStore.containsKey(identifier.getFullName())) {
+            viewStore.remove(identifier.getFullName());
+        }
+        if (!ignoreIfNotExists) {
+            throw new ViewNotExistException(identifier);
+        }
+    }
+
+    @Override
+    public void createView(Identifier identifier, View view, boolean 
ignoreIfExists)
+            throws ViewAlreadyExistException, DatabaseNotExistException {
+        getDatabase(identifier.getDatabaseName());
+        if (viewStore.containsKey(identifier.getFullName()) && 
!ignoreIfExists) {
+            throw new ViewAlreadyExistException(identifier);
+        }
+        viewStore.put(identifier.getFullName(), view);
+    }
+
+    @Override
+    public List<String> listViews(String databaseName) throws 
DatabaseNotExistException {
+        getDatabase(databaseName);
+        return viewStore.keySet().stream()
+                .map(Identifier::fromString)
+                .filter(identifier -> 
identifier.getDatabaseName().equals(databaseName))
+                .map(Identifier::getTableName)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void renameView(Identifier fromView, Identifier toView, boolean 
ignoreIfNotExists)
+            throws ViewNotExistException, ViewAlreadyExistException {
+        if (!viewStore.containsKey(fromView.getFullName()) && 
!ignoreIfNotExists) {
+            throw new ViewNotExistException(fromView);
+        }
+        if (viewStore.containsKey(toView.getFullName())) {
+            throw new ViewAlreadyExistException(toView);
+        }
+        if (viewStore.containsKey(fromView.getFullName())) {
+            View view = viewStore.get(fromView.getFullName());
+            viewStore.remove(fromView.getFullName());
+            viewStore.put(toView.getFullName(), view);
+        }
+    }
+
+    @Override
+    public boolean commitSnapshot(
+            Identifier identifier, Snapshot snapshot, List<Partition> 
statistics)
+            throws TableNotExistException {
+        tableSnapshotStore.put(identifier.getFullName(), snapshot);
+        return false;
+    }
+
+    @Override
+    public Optional<Snapshot> loadSnapshot(Identifier identifier) throws 
TableNotExistException {
+        return 
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
+    }
+
+    public RESTToken getToken(Identifier identifier) {
+        if (dataTokenStore.containsKey(identifier.getFullName())) {
+            return dataTokenStore.get(identifier.getFullName());
+        }
+        long currentTimeMillis = System.currentTimeMillis();
+        RESTToken token =
+                new RESTToken(
+                        ImmutableMap.of(
+                                "akId",
+                                "akId" + currentTimeMillis,
+                                "akSecret",
+                                "akSecret" + currentTimeMillis),
+                        currentTimeMillis);
+        dataTokenStore.put(identifier.getFullName(), token);
+        return dataTokenStore.get(identifier.getFullName());
+    }
+
+    private Partition spec2Partition(Map<String, String> spec) {
+        // todo: need update
+        return new Partition(spec, 123, 456, 789, 123);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 564e1e5d8a..fd84bf7924 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -19,16 +19,20 @@
 package org.apache.paimon.rest;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
+import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterPartitionsRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
@@ -39,6 +43,7 @@ import org.apache.paimon.rest.requests.CreateViewRequest;
 import org.apache.paimon.rest.requests.DropPartitionsRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.CommitTableResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.ErrorResponse;
@@ -67,16 +72,18 @@ import okhttp3.mockwebserver.Dispatcher;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
-import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
 
 /** Mock REST server for testing. */
 public class RESTCatalogServer {
@@ -84,16 +91,31 @@ public class RESTCatalogServer {
     private static final String PREFIX = "paimon";
     private static final String DATABASE_URI = 
String.format("/v1/%s/databases", PREFIX);
 
-    private final Catalog catalog;
+    private final MetadataInMemoryFileSystemCatalog catalog;
     private final Dispatcher dispatcher;
     private final MockWebServer server;
     private final String authToken;
 
+    public final Map<String, Database> databaseStore = new HashMap<>();
+    public final Map<String, TableMetadata> tableMetadataStore = new 
HashMap<>();
+    public final Map<String, List<Partition>> tablePartitionsStore = new 
HashMap<>();
+    public final Map<String, View> viewStore = new HashMap<>();
+    public final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
+    Map<String, RESTToken> dataTokenStore = new HashMap<>();
+
     public RESTCatalogServer(String warehouse, String initToken) {
         authToken = initToken;
         Options conf = new Options();
         conf.setString("warehouse", warehouse);
-        this.catalog = TestRESTCatalog.create(CatalogContext.create(conf));
+        this.catalog =
+                MetadataInMemoryFileSystemCatalog.create(
+                        CatalogContext.create(conf),
+                        databaseStore,
+                        tableMetadataStore,
+                        tableSnapshotStore,
+                        tablePartitionsStore,
+                        viewStore,
+                        dataTokenStore);
         this.dispatcher = initDispatcher(catalog, warehouse, authToken);
         MockWebServer mockWebServer = new MockWebServer();
         mockWebServer.setDispatcher(dispatcher);
@@ -112,7 +134,16 @@ public class RESTCatalogServer {
         server.shutdown();
     }
 
-    public static Dispatcher initDispatcher(Catalog catalog, String warehouse, 
String authToken) {
+    public void setTableSnapshot(Identifier identifier, Snapshot snapshot) {
+        tableSnapshotStore.put(identifier.getFullName(), snapshot);
+    }
+
+    public void setDataToken(Identifier identifier, RESTToken token) {
+        dataTokenStore.put(identifier.getFullName(), token);
+    }
+
+    public static Dispatcher initDispatcher(
+            MetadataInMemoryFileSystemCatalog catalog, String warehouse, 
String authToken) {
         return new Dispatcher() {
             @Override
             public MockResponse dispatch(RecordedRequest request) {
@@ -241,17 +272,22 @@ public class RESTCatalogServer {
                             }
                             return partitionsApiHandler(catalog, request, 
databaseName, tableName);
                         } else if (isTableToken) {
+                            RESTToken dataToken =
+                                    
catalog.getToken(Identifier.create(databaseName, resources[2]));
                             GetTableTokenResponse getTableTokenResponse =
                                     new GetTableTokenResponse(
-                                            ImmutableMap.of("key", "value"),
-                                            System.currentTimeMillis());
+                                            dataToken.token(), 
dataToken.expireAtMillis());
                             return new MockResponse()
                                     .setResponseCode(200)
                                     .setBody(
                                             OBJECT_MAPPER.writeValueAsString(
                                                     getTableTokenResponse));
                         } else if (isTableSnapshot) {
-                            if (!"my_snapshot_table".equals(resources[2])) {
+                            String tableName = resources[2];
+                            Optional<Snapshot> snapshotOptional =
+                                    catalog.loadSnapshot(
+                                            Identifier.create(databaseName, 
tableName));
+                            if (!snapshotOptional.isPresent()) {
                                 response =
                                         new ErrorResponse(
                                                 
ErrorResponseResourceType.SNAPSHOT,
@@ -261,8 +297,7 @@ public class RESTCatalogServer {
                                 return mockResponse(response, 404);
                             }
                             GetTableSnapshotResponse getTableSnapshotResponse =
-                                    new GetTableSnapshotResponse(
-                                            createSnapshotWithMillis(10086, 
100));
+                                    new 
GetTableSnapshotResponse(snapshotOptional.get());
                             return new MockResponse()
                                     .setResponseCode(200)
                                     .setBody(
@@ -447,6 +482,25 @@ public class RESTCatalogServer {
             case "DELETE":
                 catalog.dropDatabase(databaseName, false, true);
                 return new MockResponse().setResponseCode(200);
+            case "POST":
+                AlterDatabaseRequest requestBody =
+                        OBJECT_MAPPER.readValue(
+                                request.getBody().readUtf8(), 
AlterDatabaseRequest.class);
+                List<PropertyChange> changes = new ArrayList<>();
+                for (String property : requestBody.getRemovals()) {
+                    changes.add(PropertyChange.removeProperty(property));
+                }
+                for (Map.Entry<String, String> entry : 
requestBody.getUpdates().entrySet()) {
+                    changes.add(PropertyChange.setProperty(entry.getKey(), 
entry.getValue()));
+                }
+                catalog.alterDatabase(databaseName, changes, false);
+                AlterDatabaseResponse alterDatabaseResponse =
+                        new AlterDatabaseResponse(
+                                requestBody.getRemovals(),
+                                requestBody.getUpdates().keySet().stream()
+                                        .collect(Collectors.toList()),
+                                Collections.emptyList());
+                return mockResponse(alterDatabaseResponse, 200);
             default:
                 return new MockResponse().setResponseCode(404);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index b0cf23202a..3ba1943d10 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -50,6 +50,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -149,12 +150,14 @@ class RESTCatalogTest extends CatalogTestBase {
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
-
-        Optional<Snapshot> snapshot =
-                catalog.loadSnapshot(Identifier.create("test_db_a", 
"my_snapshot_table"));
+        Identifier hasSnapshotTable = Identifier.create("test_db_a", 
"my_snapshot_table");
+        long id = 10086;
+        long millis = System.currentTimeMillis();
+        restCatalogServer.setTableSnapshot(hasSnapshotTable, 
createSnapshotWithMillis(id, millis));
+        Optional<Snapshot> snapshot = catalog.loadSnapshot(hasSnapshotTable);
         assertThat(snapshot).isPresent();
-        assertThat(snapshot.get().id()).isEqualTo(10086);
-        assertThat(snapshot.get().timeMillis()).isEqualTo(100);
+        assertThat(snapshot.get().id()).isEqualTo(id);
+        assertThat(snapshot.get().timeMillis()).isEqualTo(millis);
 
         snapshot = catalog.loadSnapshot(Identifier.create("test_db_a", 
"unknown"));
         assertThat(snapshot).isEmpty();
@@ -175,6 +178,11 @@ class RESTCatalogTest extends CatalogTestBase {
         return true;
     }
 
+    @Override
+    protected boolean supportsAlterDatabase() {
+        return true;
+    }
+
     private void createTable(
             Identifier identifier, Map<String, String> options, List<String> 
partitionKeys)
             throws Exception {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
deleted file mode 100644
index a812965310..0000000000
--- a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.TableType;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.FileSystemCatalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.catalog.TableMetadata;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.Partition;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.view.View;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/** A catalog for testing RESTCatalog. */
-public class TestRESTCatalog extends FileSystemCatalog {
-
-    public Map<String, TableSchema> tableFullName2Schema = new HashMap<String, 
TableSchema>();
-    public Map<String, List<Partition>> tableFullName2Partitions =
-            new HashMap<String, List<Partition>>();
-    public final Map<String, View> viewFullName2View = new HashMap<String, 
View>();
-
-    public TestRESTCatalog(FileIO fileIO, Path warehouse, Options options) {
-        super(fileIO, warehouse, options);
-    }
-
-    public static TestRESTCatalog create(CatalogContext context) {
-        String warehouse = 
CatalogFactory.warehouse(context).toUri().toString();
-
-        Path warehousePath = new Path(warehouse);
-        FileIO fileIO;
-
-        try {
-            fileIO = FileIO.get(warehousePath, context);
-            fileIO.checkOrMkdirs(warehousePath);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
-        return new TestRESTCatalog(fileIO, warehousePath, context.options());
-    }
-
-    @Override
-    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
-            throws TableNotExistException {
-        getTable(identifier);
-        tableFullName2Partitions.put(
-                identifier.getFullName(),
-                partitions.stream()
-                        .map(partition -> spec2Partition(partition))
-                        .collect(Collectors.toList()));
-    }
-
-    @Override
-    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
-            throws TableNotExistException {
-        getTable(identifier);
-        List<Partition> existPartitions = 
tableFullName2Partitions.get(identifier.getFullName());
-        partitions.forEach(
-                partition -> {
-                    for (Map.Entry<String, String> entry : 
partition.entrySet()) {
-                        existPartitions.stream()
-                                .filter(
-                                        p ->
-                                                
p.spec().containsKey(entry.getKey())
-                                                        && p.spec()
-                                                                
.get(entry.getKey())
-                                                                
.equals(entry.getValue()))
-                                .findFirst()
-                                .ifPresent(
-                                        existPartition -> 
existPartitions.remove(existPartition));
-                    }
-                });
-    }
-
-    @Override
-    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
-            throws TableNotExistException {
-        getTable(identifier);
-        List<Partition> existPartitions = 
tableFullName2Partitions.get(identifier.getFullName());
-        partitions.forEach(
-                partition -> {
-                    for (Map.Entry<String, String> entry : 
partition.spec().entrySet()) {
-                        existPartitions.stream()
-                                .filter(
-                                        p ->
-                                                
p.spec().containsKey(entry.getKey())
-                                                        && p.spec()
-                                                                
.get(entry.getKey())
-                                                                
.equals(entry.getValue()))
-                                .findFirst()
-                                .ifPresent(
-                                        existPartition -> 
existPartitions.remove(existPartition));
-                    }
-                });
-        existPartitions.addAll(partitions);
-        tableFullName2Partitions.put(identifier.getFullName(), 
existPartitions);
-    }
-
-    @Override
-    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
-        getTable(identifier);
-        return tableFullName2Partitions.get(identifier.getFullName());
-    }
-
-    @Override
-    public View getView(Identifier identifier) throws ViewNotExistException {
-        if (viewFullName2View.containsKey(identifier.getFullName())) {
-            return viewFullName2View.get(identifier.getFullName());
-        }
-        throw new ViewNotExistException(identifier);
-    }
-
-    @Override
-    public void dropView(Identifier identifier, boolean ignoreIfNotExists)
-            throws ViewNotExistException {
-        if (viewFullName2View.containsKey(identifier.getFullName())) {
-            viewFullName2View.remove(identifier.getFullName());
-        }
-        if (!ignoreIfNotExists) {
-            throw new ViewNotExistException(identifier);
-        }
-    }
-
-    @Override
-    public void createView(Identifier identifier, View view, boolean 
ignoreIfExists)
-            throws ViewAlreadyExistException, DatabaseNotExistException {
-        getDatabase(identifier.getDatabaseName());
-        if (viewFullName2View.containsKey(identifier.getFullName()) && 
!ignoreIfExists) {
-            throw new ViewAlreadyExistException(identifier);
-        }
-        viewFullName2View.put(identifier.getFullName(), view);
-    }
-
-    @Override
-    public List<String> listViews(String databaseName) throws 
DatabaseNotExistException {
-        getDatabase(databaseName);
-        return viewFullName2View.keySet().stream()
-                .map(v -> Identifier.fromString(v))
-                .filter(identifier -> 
identifier.getDatabaseName().equals(databaseName))
-                .map(identifier -> identifier.getTableName())
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public void renameView(Identifier fromView, Identifier toView, boolean 
ignoreIfNotExists)
-            throws ViewNotExistException, ViewAlreadyExistException {
-        if (!viewFullName2View.containsKey(fromView.getFullName()) && 
!ignoreIfNotExists) {
-            throw new ViewNotExistException(fromView);
-        }
-        if (viewFullName2View.containsKey(toView.getFullName())) {
-            throw new ViewAlreadyExistException(toView);
-        }
-        if (viewFullName2View.containsKey(fromView.getFullName())) {
-            View view = viewFullName2View.get(fromView.getFullName());
-            viewFullName2View.remove(fromView.getFullName());
-            viewFullName2View.put(toView.getFullName(), view);
-        }
-    }
-
-    @Override
-    protected List<String> listTablesImpl(String databaseName) {
-        List<String> tables = super.listTablesImpl(databaseName);
-        for (Map.Entry<String, TableSchema> entry : 
tableFullName2Schema.entrySet()) {
-            Identifier identifier = Identifier.fromString(entry.getKey());
-            if (databaseName.equals(identifier.getDatabaseName())) {
-                tables.add(identifier.getTableName());
-            }
-        }
-        return tables;
-    }
-
-    @Override
-    protected void dropTableImpl(Identifier identifier) {
-        if (tableFullName2Schema.containsKey(identifier.getFullName())) {
-            tableFullName2Schema.remove(identifier.getFullName());
-        } else {
-            super.dropTableImpl(identifier);
-        }
-    }
-
-    @Override
-    public void renameTableImpl(Identifier fromTable, Identifier toTable) {
-        if (tableFullName2Schema.containsKey(fromTable.getFullName())) {
-            TableSchema tableSchema = 
tableFullName2Schema.get(fromTable.getFullName());
-            tableFullName2Schema.remove(fromTable.getFullName());
-            tableFullName2Schema.put(toTable.getFullName(), tableSchema);
-        } else {
-            super.renameTableImpl(fromTable, toTable);
-        }
-    }
-
-    @Override
-    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
-            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        if (tableFullName2Schema.containsKey(identifier.getFullName())) {
-            TableSchema schema = 
tableFullName2Schema.get(identifier.getFullName());
-            Options options = Options.fromMap(schema.options());
-            if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
-                throw new UnsupportedOperationException("Only data table 
support alter table.");
-            }
-        } else {
-            super.alterTableImpl(identifier, changes);
-        }
-    }
-
-    @Override
-    public void createFormatTable(Identifier identifier, Schema schema) {
-        Map<String, String> options = new HashMap<>(schema.options());
-        options.put("path", "/tmp/format_table");
-        TableSchema tableSchema =
-                new TableSchema(
-                        1L,
-                        schema.fields(),
-                        1,
-                        schema.partitionKeys(),
-                        schema.primaryKeys(),
-                        options,
-                        schema.comment());
-        tableFullName2Schema.put(identifier.getFullName(), tableSchema);
-    }
-
-    @Override
-    protected TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
-        if (tableFullName2Schema.containsKey(identifier.getFullName())) {
-            TableSchema tableSchema = 
tableFullName2Schema.get(identifier.getFullName());
-            return new TableMetadata(tableSchema, false, "uuid");
-        }
-        return super.loadTableMetadata(identifier);
-    }
-
-    private Partition spec2Partition(Map<String, String> spec) {
-        return new Partition(spec, 123, 456, 789, 123);
-    }
-}


Reply via email to