This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a665e6f5 [core] Introduce CachingCatalog (#3829)
4a665e6f5 is described below

commit 4a665e6f5e50dce9c8bc75bf9cd15d419404fccd
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 30 21:24:59 2024 +0800

    [core] Introduce CachingCatalog (#3829)
---
 .../generated/catalog_configuration.html           |  12 +
 .../org/apache/paimon/options/CatalogOptions.java  |  14 +
 .../org/apache/paimon/catalog/AbstractCatalog.java |  46 +---
 .../org/apache/paimon/catalog/CachingCatalog.java  | 198 ++++++++++++++
 .../java/org/apache/paimon/catalog/Catalog.java    |  41 +++
 .../org/apache/paimon/catalog/CatalogFactory.java  |  26 ++
 .../apache/paimon/catalog/FileSystemCatalog.java   |   2 +-
 .../paimon/catalog/FileSystemCatalogFactory.java   |  17 +-
 .../org/apache/paimon/schema/SchemaManager.java    |   2 +-
 .../apache/paimon/catalog/CachingCatalogTest.java  | 288 +++++++++++++++++++++
 .../org/apache/paimon/catalog/CatalogTestBase.java |  20 +-
 .../paimon/catalog/FileSystemCatalogTest.java      |  32 +++
 .../paimon/catalog/TestableCachingCatalog.java     |  57 ++++
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    |   9 -
 .../table/system/AllTableOptionsTableTest.java     |  33 +--
 .../java/org/apache/paimon/utils/FakeTicker.java   |  42 +++
 .../flink/action/cdc/SyncDatabaseActionBase.java   |   7 +-
 .../flink/action/cdc/SyncTableActionBase.java      |   6 +-
 .../mysql/MySqlSyncDatabaseTableListITCase.java    |   2 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   5 +-
 .../org/apache/paimon/flink/action/ActionBase.java |   7 +
 .../paimon/flink/clone/CopyFileOperator.java       |  32 ++-
 .../flink/procedure/MigrateDatabaseProcedure.java  |   7 +-
 .../flink/procedure/MigrateFileProcedure.java      |   6 +-
 .../flink/procedure/MigrateTableProcedure.java     |   7 +-
 .../paimon/flink/procedure/RepairProcedure.java    |   5 -
 .../paimon/flink/utils/TableMigrationUtils.java    |  26 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  35 ---
 .../paimon/flink/FileSystemCatalogITCase.java      |   3 +-
 .../org/apache/paimon/flink/FlinkCatalogTest.java  |  16 +-
 .../apache/paimon/flink/PartialUpdateITCase.java   |  43 ---
 .../paimon/flink/action/ActionITCaseBase.java      |   6 +-
 .../privilege/PrivilegeProcedureITCase.java        |   2 -
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  28 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   8 +
 .../paimon/spark/utils/TableMigrationUtils.java    |  12 +-
 .../apache/paimon/spark/SparkFileIndexITCase.java  |   2 +
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |   9 +-
 38 files changed, 844 insertions(+), 269 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index a583c7471..4508d898d 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,18 @@ under the License.
             <td>Boolean</td>
             <td>Indicates whether this catalog allow upper case, its default 
value depends on the implementation of the specific catalog.</td>
         </tr>
+        <tr>
+            <td><h5>cache-enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Controls whether the catalog will cache databases, tables and 
manifests.</td>
+        </tr>
+        <tr>
+            <td><h5>cache.expiration-interval</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>Controls the duration for which databases and tables in the 
catalog are cached.</td>
+        </tr>
         <tr>
             <td><h5>client-pool-size</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 9ef681809..980083e71 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -91,6 +91,20 @@ public class CatalogOptions {
                     .defaultValue(2)
                     .withDescription("Configure the size of the connection 
pool.");
 
+    public static final ConfigOption<Boolean> CACHE_ENABLED =
+            key("cache-enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Controls whether the catalog will cache 
databases, tables and manifests.");
+
+    public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
+            key("cache.expiration-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription(
+                            "Controls the duration for which databases and 
tables in the catalog are cached.");
+
     public static final ConfigOption<String> LINEAGE_META =
             key("lineage-meta")
                     .stringType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index afe64666a..062e93532 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -45,7 +45,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -58,17 +57,12 @@ import static 
org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
 import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Common implementation of {@link Catalog}. */
 public abstract class AbstractCatalog implements Catalog {
 
-    public static final String DB_SUFFIX = ".db";
-    protected static final String TABLE_DEFAULT_OPTION_PREFIX = 
"table-default.";
-    protected static final String DB_LOCATION_PROP = "location";
-
     protected final FileIO fileIO;
     protected final Map<String, String> tableDefaultOptions;
     protected final Options catalogOptions;
@@ -86,8 +80,7 @@ public abstract class AbstractCatalog implements Catalog {
         this.fileIO = fileIO;
         this.lineageMetaFactory =
                 findAndCreateLineageMeta(options, 
AbstractCatalog.class.getClassLoader());
-        this.tableDefaultOptions =
-                convertToPropertiesPrefixKey(options.toMap(), 
TABLE_DEFAULT_OPTION_PREFIX);
+        this.tableDefaultOptions = 
Catalog.tableDefaultOptions(options.toMap());
         this.catalogOptions = options;
     }
 
@@ -445,12 +438,12 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    private static boolean isSpecifiedSystemTable(Identifier identifier) {
+    public static boolean isSpecifiedSystemTable(Identifier identifier) {
         return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
                 && !getOriginalIdentifierAndBranch(identifier).isPresent();
     }
 
-    protected boolean isSystemTable(Identifier identifier) {
+    protected static boolean isSystemTable(Identifier identifier) {
         return isSystemDatabase(identifier.getDatabaseName()) || 
isSpecifiedSystemTable(identifier);
     }
 
@@ -463,11 +456,11 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    public void copyTableDefaultOptions(Map<String, String> options) {
+    private void copyTableDefaultOptions(Map<String, String> options) {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
-    private String[] tableAndSystemName(Identifier identifier) {
+    public static String[] tableAndSystemName(Identifier identifier) {
         String[] splits = StringUtils.split(identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER);
         if (splits.length != 2) {
             throw new IllegalArgumentException(
@@ -493,7 +486,7 @@ public abstract class AbstractCatalog implements Catalog {
         return new Path(warehouse, database + DB_SUFFIX);
     }
 
-    private boolean isSystemDatabase(String database) {
+    public static boolean isSystemDatabase(String database) {
         return SYSTEM_DATABASE_NAME.equals(database);
     }
 
@@ -504,30 +497,9 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
-    public static void validateCaseInsensitive(
-            boolean caseSensitive, String type, String... names) {
-        validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
-    }
-
-    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
-    public static void validateCaseInsensitive(
-            boolean caseSensitive, String type, List<String> names) {
-        if (caseSensitive) {
-            return;
-        }
-        List<String> illegalNames =
-                names.stream().filter(f -> 
!f.equals(f.toLowerCase())).collect(Collectors.toList());
-        checkArgument(
-                illegalNames.isEmpty(),
-                String.format(
-                        "%s name %s cannot contain upper case in the catalog.",
-                        type, illegalNames));
-    }
-
     protected void validateIdentifierNameCaseInsensitive(Identifier 
identifier) {
-        validateCaseInsensitive(allowUpperCase(), "Database", 
identifier.getDatabaseName());
-        validateCaseInsensitive(allowUpperCase(), "Table", 
identifier.getObjectName());
+        Catalog.validateCaseInsensitive(allowUpperCase(), "Database", 
identifier.getDatabaseName());
+        Catalog.validateCaseInsensitive(allowUpperCase(), "Table", 
identifier.getObjectName());
     }
 
     private void 
validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
@@ -545,7 +517,7 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
-        validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
+        Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
     }
 
     private void validateAutoCreateClose(Map<String, String> options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
new file mode 100644
index 000000000..b11765a9a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
+import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
+
+/** A {@link Catalog} to cache databases and tables and manifests. */
+public class CachingCatalog extends DelegateCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+
+    protected final Cache<String, Map<String, String>> databaseCache;
+    protected final Cache<Identifier, Table> tableCache;
+
+    public CachingCatalog(Catalog wrapped) {
+        this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
+    }
+
+    public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
+        this(wrapped, expirationInterval, Ticker.systemTicker());
+    }
+
+    public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker 
ticker) {
+        super(wrapped);
+        if (expirationInterval.isZero() || expirationInterval.isNegative()) {
+            throw new IllegalArgumentException(
+                    "When cache.expiration-interval is set to negative or 0, 
the catalog cache should be disabled.");
+        }
+
+        this.databaseCache =
+                Caffeine.newBuilder()
+                        .softValues()
+                        .executor(Runnable::run)
+                        .expireAfterAccess(expirationInterval)
+                        .ticker(ticker)
+                        .build();
+        this.tableCache =
+                Caffeine.newBuilder()
+                        .softValues()
+                        .removalListener(new 
TableInvalidatingRemovalListener())
+                        .executor(Runnable::run)
+                        .expireAfterAccess(expirationInterval)
+                        .ticker(ticker)
+                        .build();
+    }
+
+    @Override
+    public Map<String, String> loadDatabaseProperties(String databaseName)
+            throws DatabaseNotExistException {
+        Map<String, String> properties = 
databaseCache.getIfPresent(databaseName);
+        if (properties != null) {
+            return properties;
+        }
+
+        properties = super.loadDatabaseProperties(databaseName);
+        databaseCache.put(databaseName, properties);
+        return properties;
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException {
+        super.dropDatabase(name, ignoreIfNotExists, cascade);
+        databaseCache.invalidate(name);
+        if (cascade) {
+            List<Identifier> tables = new ArrayList<>();
+            for (Identifier identifier : tableCache.asMap().keySet()) {
+                if (identifier.getDatabaseName().equals(name)) {
+                    tables.add(identifier);
+                }
+            }
+            tables.forEach(tableCache::invalidate);
+        }
+    }
+
+    @Override
+    public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        super.dropTable(identifier, ignoreIfNotExists);
+        invalidateTable(identifier);
+    }
+
+    @Override
+    public void renameTable(Identifier fromTable, Identifier toTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException {
+        super.renameTable(fromTable, toTable, ignoreIfNotExists);
+        invalidateTable(fromTable);
+    }
+
+    @Override
+    public void alterTable(
+            Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        super.alterTable(identifier, changes, ignoreIfNotExists);
+        invalidateTable(identifier);
+    }
+
+    @Override
+    public Table getTable(Identifier identifier) throws TableNotExistException 
{
+        Table table = tableCache.getIfPresent(identifier);
+        if (table != null) {
+            return table;
+        }
+
+        if (isSpecifiedSystemTable(identifier)) {
+            String[] splits = tableAndSystemName(identifier);
+            String tableName = splits[0];
+            String type = splits[1];
+
+            Identifier originIdentifier =
+                    Identifier.create(identifier.getDatabaseName(), tableName);
+            Table originTable = tableCache.getIfPresent(originIdentifier);
+            if (originTable == null) {
+                originTable = wrapped.getTable(originIdentifier);
+                tableCache.put(originIdentifier, originTable);
+            }
+            table = SystemTableLoader.load(type, (FileStoreTable) originTable);
+            if (table == null) {
+                throw new TableNotExistException(identifier);
+            }
+            tableCache.put(identifier, table);
+            return table;
+        }
+
+        table = wrapped.getTable(identifier);
+        tableCache.put(identifier, table);
+        return table;
+    }
+
+    private class TableInvalidatingRemovalListener implements 
RemovalListener<Identifier, Table> {
+        @Override
+        public void onRemoval(Identifier identifier, Table table, @NonNull 
RemovalCause cause) {
+            LOG.debug("Evicted {} from the table cache ({})", identifier, 
cause);
+            if (RemovalCause.EXPIRED.equals(cause)) {
+                tryInvalidateSysTables(identifier);
+            }
+        }
+    }
+
+    @Override
+    public void invalidateTable(Identifier identifier) {
+        tableCache.invalidate(identifier);
+        tryInvalidateSysTables(identifier);
+    }
+
+    private void tryInvalidateSysTables(Identifier identifier) {
+        if (!isSpecifiedSystemTable(identifier)) {
+            tableCache.invalidateAll(allSystemTables(identifier));
+        }
+    }
+
+    private static Iterable<Identifier> allSystemTables(Identifier ident) {
+        List<Identifier> tables = new ArrayList<>();
+        for (String type : SYSTEM_TABLES) {
+            tables.add(Identifier.fromString(ident.getFullName() + 
SYSTEM_TABLE_SPLITTER + type));
+        }
+        return tables;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index fe8e0b68b..ac229464b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,10 +26,15 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * This interface is responsible for reading and writing metadata such as 
database/table from a
@@ -47,6 +52,9 @@ public interface Catalog extends AutoCloseable {
     String SYSTEM_TABLE_SPLITTER = "$";
     String SYSTEM_DATABASE_NAME = "sys";
     String COMMENT_PROP = "comment";
+    String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
+    String DB_LOCATION_PROP = "location";
+    String DB_SUFFIX = ".db";
 
     /** Warehouse root path containing all database directories in this 
catalog. */
     String warehouse();
@@ -235,6 +243,16 @@ public interface Catalog extends AutoCloseable {
     void alterTable(Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
 
+    /**
+     * Invalidate cached table metadata for an {@link Identifier identifier}.
+     *
+     * <p>If the table is already loaded or cached, drop cached data. If the 
table does not exist or
+     * is not cached, do nothing. Calling this method should not query remote 
services.
+     *
+     * @param identifier a table identifier
+     */
+    default void invalidateTable(Identifier identifier) {}
+
     /**
      * Drop the partition of the specify table.
      *
@@ -277,6 +295,29 @@ public interface Catalog extends AutoCloseable {
         throw new UnsupportedOperationException();
     }
 
+    static Map<String, String> tableDefaultOptions(Map<String, String> 
options) {
+        return convertToPropertiesPrefixKey(options, 
TABLE_DEFAULT_OPTION_PREFIX);
+    }
+
+    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
+    static void validateCaseInsensitive(boolean caseSensitive, String type, 
String... names) {
+        validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
+    }
+
+    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
+    static void validateCaseInsensitive(boolean caseSensitive, String type, 
List<String> names) {
+        if (caseSensitive) {
+            return;
+        }
+        List<String> illegalNames =
+                names.stream().filter(f -> 
!f.equals(f.toLowerCase())).collect(Collectors.toList());
+        checkArgument(
+                illegalNames.isEmpty(),
+                String.format(
+                        "%s name %s cannot contain upper case in the catalog.",
+                        type, illegalNames));
+    }
+
     /** Exception for trying to drop on a database that is not empty. */
     class DatabaseNotEmptyException extends Exception {
         private static final String MSG = "Database %s is not empty.";
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index c153f0ee3..2eff8e902 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -24,11 +24,16 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 
@@ -67,6 +72,27 @@ public interface CatalogFactory extends Factory {
     }
 
     static Catalog createCatalog(CatalogContext context, ClassLoader 
classLoader) {
+        Catalog catalog = createUnwrappedCatalog(context, classLoader);
+
+        Options options = context.options();
+        if (options.get(CACHE_ENABLED)) {
+            catalog = new CachingCatalog(catalog, 
options.get(CACHE_EXPIRATION_INTERVAL_MS));
+        }
+
+        PrivilegeManager privilegeManager =
+                new FileBasedPrivilegeManager(
+                        catalog.warehouse(),
+                        catalog.fileIO(),
+                        context.options().get(PrivilegedCatalog.USER),
+                        context.options().get(PrivilegedCatalog.PASSWORD));
+        if (privilegeManager.privilegeEnabled()) {
+            catalog = new PrivilegedCatalog(catalog, privilegeManager);
+        }
+
+        return catalog;
+    }
+
+    static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader 
classLoader) {
         Options options = context.options();
         String metastore = options.get(METASTORE);
         CatalogFactory catalogFactory =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index c2ff37601..64f38a106 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -63,7 +63,7 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     protected void createDatabaseImpl(String name, Map<String, String> 
properties) {
-        if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) {
+        if (properties.containsKey(Catalog.DB_LOCATION_PROP)) {
             throw new IllegalArgumentException(
                     "Cannot specify location for a database when using 
fileSystem catalog.");
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 8a0b1643f..6ab3664e4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -20,9 +20,6 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
-import org.apache.paimon.privilege.PrivilegedCatalog;
 import org.apache.paimon.table.TableType;
 
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -44,18 +41,6 @@ public class FileSystemCatalogFactory implements 
CatalogFactory {
                     "Only managed table is supported in File system catalog.");
         }
 
-        Catalog catalog = new FileSystemCatalog(fileIO, warehouse, 
context.options());
-
-        PrivilegeManager privilegeManager =
-                new FileBasedPrivilegeManager(
-                        warehouse.toString(),
-                        fileIO,
-                        context.options().get(PrivilegedCatalog.USER),
-                        context.options().get(PrivilegedCatalog.PASSWORD));
-        if (privilegeManager.privilegeEnabled()) {
-            catalog = new PrivilegedCatalog(catalog, privilegeManager);
-        }
-
-        return catalog;
+        return new FileSystemCatalog(fileIO, warehouse, context.options());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 684adfe9d..3c9db09aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -66,7 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
+import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
 import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
new file mode 100644
index 000000000..56fc238a8
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.FakeTicker;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class CachingCatalogTest extends CatalogTestBase {
+
+    private static final Duration EXPIRATION_TTL = Duration.ofMinutes(5);
+    private static final Duration HALF_OF_EXPIRATION = 
EXPIRATION_TTL.dividedBy(2);
+
+    private FakeTicker ticker;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        catalog = new FileSystemCatalog(fileIO, new Path(warehouse));
+        ticker = new FakeTicker();
+        catalog.createDatabase("db", false);
+    }
+
+    @Override
+    @Test
+    public void testListDatabasesWhenNoDatabases() {
+        List<String> databases = catalog.listDatabases();
+        assertThat(databases).contains("db");
+    }
+
+    @Test
+    public void testInvalidateSystemTablesIfBaseTableIsModified() throws 
Exception {
+        Catalog catalog = new CachingCatalog(this.catalog);
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, 
false);
+        Identifier sysIdent = new Identifier("db", "tbl$files");
+        Table sysTable = catalog.getTable(sysIdent);
+        catalog.alterTable(tableIdent, SchemaChange.addColumn("col3", 
DataTypes.INT()), false);
+        assertThat(catalog.getTable(sysIdent)).isNotSameAs(sysTable);
+    }
+
+    @Test
+    public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception 
{
+        Catalog catalog = new CachingCatalog(this.catalog);
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, 
false);
+        Identifier sysIdent = new Identifier("db", "tbl$files");
+        catalog.getTable(sysIdent);
+        catalog.dropTable(tableIdent, false);
+        assertThatThrownBy(() -> catalog.getTable(sysIdent))
+                .hasMessage("Table db.tbl does not exist.");
+    }
+
+    @Test
+    public void testTableExpiresAfterInterval() throws Exception {
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+        Table table = catalog.getTable(tableIdent);
+
+        // Ensure table is cached with full ttl remaining upon creation
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
+
+        ticker.advance(HALF_OF_EXPIRATION);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+
+        ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
+        assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+        assertThat(catalog.getTable(tableIdent))
+                .as("CachingCatalog should return a new instance after 
expiration")
+                .isNotSameAs(table);
+    }
+
+    @Test
+    public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() 
throws Exception {
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+        catalog.getTable(tableIdent);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
+
+        ticker.advance(HALF_OF_EXPIRATION);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+        assertThat(catalog.remainingAgeFor(tableIdent))
+                .isPresent()
+                .get()
+                .isEqualTo(HALF_OF_EXPIRATION);
+
+        Duration oneMinute = Duration.ofMinutes(1L);
+        ticker.advance(oneMinute);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.ageOf(tableIdent))
+                .isPresent()
+                .get()
+                .isEqualTo(HALF_OF_EXPIRATION.plus(oneMinute));
+        assertThat(catalog.remainingAgeFor(tableIdent))
+                .get()
+                .isEqualTo(HALF_OF_EXPIRATION.minus(oneMinute));
+
+        // Access the table via the catalog, which should refresh the TTL
+        Table table = catalog.getTable(tableIdent);
+        assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+        
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(EXPIRATION_TTL);
+
+        ticker.advance(HALF_OF_EXPIRATION);
+        
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+        
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+    }
+
+    @Test
+    public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+        Table table = catalog.getTable(tableIdent);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+
+        ticker.advance(HALF_OF_EXPIRATION);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+
+        for (Identifier sysTable : sysTables(tableIdent)) {
+            catalog.getTable(sysTable);
+        }
+        
assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
+        assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
+                .isNotEmpty()
+                .allMatch(age -> age.isPresent() && 
age.get().equals(Duration.ZERO));
+
+        assertThat(catalog.remainingAgeFor(tableIdent))
+                .as("Loading a non-cached sys table should refresh the main 
table's age")
+                .isEqualTo(Optional.of(EXPIRATION_TTL));
+
+        // Move time forward and access already cached sys tables.
+        ticker.advance(HALF_OF_EXPIRATION);
+        for (Identifier sysTable : sysTables(tableIdent)) {
+            catalog.getTable(sysTable);
+        }
+        assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
+                .isNotEmpty()
+                .allMatch(age -> age.isPresent() && 
age.get().equals(Duration.ZERO));
+
+        assertThat(catalog.remainingAgeFor(tableIdent))
+                .as("Accessing a cached sys table should not affect the main 
table's age")
+                .isEqualTo(Optional.of(HALF_OF_EXPIRATION));
+
+        // Move time forward so the data table drops.
+        ticker.advance(HALF_OF_EXPIRATION);
+        assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+
+        Arrays.stream(sysTables(tableIdent))
+                .forEach(
+                        sysTable ->
+                                assertThat(catalog.cache().asMap())
+                                        .as(
+                                                "When a data table expires, 
its sys tables should expire regardless of age")
+                                        .doesNotContainKeys(sysTable));
+    }
+
+    @Test
+    public void testDeadlock() throws Exception {
+        Catalog underlyCatalog = this.catalog;
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, 
Duration.ofSeconds(1), ticker);
+        int numThreads = 20;
+        List<Identifier> createdTables = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            Identifier tableIdent = new Identifier("db", "tbl" + i);
+            catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+            createdTables.add(tableIdent);
+        }
+
+        Cache<Identifier, Table> cache = catalog.cache();
+        AtomicInteger cacheGetCount = new AtomicInteger(0);
+        AtomicInteger cacheCleanupCount = new AtomicInteger(0);
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            if (i % 2 == 0) {
+                String table = "tbl" + i;
+                executor.submit(
+                        () -> {
+                            ticker.advance(Duration.ofSeconds(2));
+                            cache.get(
+                                    new Identifier("db", table),
+                                    identifier -> {
+                                        try {
+                                            return 
underlyCatalog.getTable(identifier);
+                                        } catch 
(Catalog.TableNotExistException e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    });
+                            cacheGetCount.incrementAndGet();
+                        });
+            } else {
+                executor.submit(
+                        () -> {
+                            ticker.advance(Duration.ofSeconds(2));
+                            cache.cleanUp();
+                            cacheCleanupCount.incrementAndGet();
+                        });
+            }
+        }
+        executor.awaitTermination(2, TimeUnit.SECONDS);
+        assertThat(cacheGetCount).hasValue(numThreads / 2);
+        assertThat(cacheCleanupCount).hasValue(numThreads / 2);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testCachingCatalogRejectsExpirationIntervalOfZero() {
+        Assertions.assertThatThrownBy(
+                        () -> new TestableCachingCatalog(this.catalog, 
Duration.ZERO, ticker))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "When cache.expiration-interval is set to negative or 
0, the catalog cache should be disabled.");
+    }
+
+    @Test
+    public void testInvalidateTableForChainedCachingCatalogs() throws 
Exception {
+        TestableCachingCatalog wrappedCatalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(wrappedCatalog, EXPIRATION_TTL, 
ticker);
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+        catalog.getTable(tableIdent);
+        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        catalog.dropTable(tableIdent, false);
+        assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+        
assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
+    }
+
+    public static Identifier[] sysTables(Identifier tableIdent) {
+        return SystemTableLoader.SYSTEM_TABLES.stream()
+                .map(type -> Identifier.fromString(tableIdent.getFullName() + 
"$" + type))
+                .toArray(Identifier[]::new);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 378ebf159..19c9e83c4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -38,7 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -55,6 +55,7 @@ public abstract class CatalogTestBase {
     protected String warehouse;
     protected FileIO fileIO;
     protected Catalog catalog;
+
     protected static final Schema DEFAULT_TABLE_SCHEMA =
             new Schema(
                     Lists.newArrayList(
@@ -66,18 +67,6 @@ public abstract class CatalogTestBase {
                     Maps.newHashMap(),
                     "");
 
-    protected static final Schema PARTITION_SCHEMA =
-            new Schema(
-                    Lists.newArrayList(
-                            new DataField(0, "pk1", DataTypes.INT()),
-                            new DataField(1, "pk2", DataTypes.STRING()),
-                            new DataField(3, "pk3", DataTypes.STRING()),
-                            new DataField(4, "col", DataTypes.STRING())),
-                    Arrays.asList("pk1", "pk2"),
-                    Arrays.asList("pk1", "pk2", "pk3"),
-                    Maps.newHashMap(),
-                    "");
-
     @BeforeEach
     public void setUp() throws Exception {
         warehouse = tempFile.toUri().toString();
@@ -95,7 +84,10 @@ public abstract class CatalogTestBase {
     }
 
     @Test
-    public abstract void testListDatabasesWhenNoDatabases();
+    public void testListDatabasesWhenNoDatabases() {
+        List<String> databases = catalog.listDatabases();
+        assertThat(databases).isEqualTo(new ArrayList<>());
+    }
 
     @Test
     public void testListDatabases() throws Exception {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
new file mode 100644
index 000000000..67fbd2718
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+
+import org.junit.jupiter.api.BeforeEach;
+
+class FileSystemCatalogTest extends CatalogTestBase {
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        catalog = new FileSystemCatalog(fileIO, new Path(warehouse));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
new file mode 100644
index 000000000..c393d3aff
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.table.Table;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * A wrapper around CachingCatalog that provides accessor methods to test the 
underlying cache,
+ * without making those fields public in the CachingCatalog itself.
+ */
+public class TestableCachingCatalog extends CachingCatalog {
+
+    private final Duration cacheExpirationInterval;
+
+    public TestableCachingCatalog(Catalog catalog, Duration 
expirationInterval, Ticker ticker) {
+        super(catalog, expirationInterval, ticker);
+        this.cacheExpirationInterval = expirationInterval;
+    }
+
+    public Cache<Identifier, Table> cache() {
+        // cleanUp must be called as tests apply assertions directly on the 
underlying map, but
+        // metadata
+        // table map entries are cleaned up asynchronously.
+        tableCache.cleanUp();
+        return tableCache;
+    }
+
+    public Optional<Duration> ageOf(Identifier identifier) {
+        return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
+    }
+
+    public Optional<Duration> remainingAgeFor(Identifier identifier) {
+        return ageOf(identifier).map(cacheExpirationInterval::minus);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index f0c84eb2c..f5befc724 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -32,8 +32,6 @@ import org.junit.jupiter.api.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -88,13 +86,6 @@ public class JdbcCatalogTest extends CatalogTestBase {
                 .isTrue();
     }
 
-    @Test
-    @Override
-    public void testListDatabasesWhenNoDatabases() {
-        List<String> databases = catalog.listDatabases();
-        assertThat(databases).isEqualTo(new ArrayList<>());
-    }
-
     @Test
     public void testCheckIdentifierUpperCase() throws Exception {
         catalog.createDatabase("test_db", false);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
index f2a1efce5..764c0f4e1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
@@ -18,10 +18,7 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
@@ -29,18 +26,12 @@ import org.apache.paimon.types.DataTypes;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Objects;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
-import static org.apache.paimon.table.system.AllTableOptionsTable.options;
-import static org.apache.paimon.table.system.AllTableOptionsTable.toRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link AllTableOptionsTable}. */
@@ -69,18 +60,14 @@ public class AllTableOptionsTableTest extends TableTestBase 
{
 
     @Test
     public void testSchemasTable() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
-        List<InternalRow> result = read(allTableOptionsTable);
-        assertThat(result).containsExactlyElementsOf(expectRow);
-    }
-
-    private List<InternalRow> getExceptedResult() {
-        AbstractCatalog abstractCatalog = (AbstractCatalog) catalog;
-        Map<String, Map<String, Path>> stringMapMap = 
abstractCatalog.allTablePaths();
-        Iterator<InternalRow> rows =
-                toRow(options(((AbstractCatalog) catalog).fileIO(), 
stringMapMap));
-        return StreamSupport.stream(
-                        Spliterators.spliteratorUnknownSize(rows, 
Spliterator.ORDERED), false)
-                .collect(Collectors.toList());
+        List<String> result =
+                read(allTableOptionsTable).stream()
+                        .map(Objects::toString)
+                        .collect(Collectors.toList());
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "+I(default,T,fields.sales.aggregate-function,sum)",
+                        "+I(default,T,merge-engine,aggregation)",
+                        "+I(default,T,fields.price.aggregate-function,max)");
     }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
new file mode 100644
index 000000000..cbd536ad5
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A {@code Ticker} whose value can be advanced programmatically in tests. */
+public class FakeTicker implements Ticker {
+
+    private final AtomicLong nanos = new AtomicLong();
+
+    public FakeTicker() {}
+
+    public FakeTicker advance(Duration duration) {
+        nanos.addAndGet(duration.toNanos());
+        return this;
+    }
+
+    @Override
+    public long read() {
+        return nanos.get();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index d6c4c12ec..6c62bcc0c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -120,9 +119,9 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
 
     @Override
     protected void validateCaseSensitivity() {
-        AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", 
database);
-        AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table 
prefix", tablePrefix);
-        AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table 
suffix", tableSuffix);
+        Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+        Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix", 
tablePrefix);
+        Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix", 
tableSuffix);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index be8cedf0b..4c7db6d28 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
@@ -114,8 +114,8 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
 
     @Override
     protected void validateCaseSensitivity() {
-        AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", 
database);
-        AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table", 
table);
+        Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+        Catalog.validateCaseInsensitive(allowUpperCase, "Table", table);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index 816b7b903..c15cbf898 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -45,7 +45,7 @@ public class MySqlSyncDatabaseTableListITCase extends 
MySqlActionITCaseBase {
     }
 
     @Test
-    @Timeout(120)
+    @Timeout(180)
     public void testActionRunResult() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 98efd479f..2cb96e007 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -347,10 +347,7 @@ public class FlinkCatalog extends AbstractCatalog {
             // Although catalog.createTable will copy the default options, but 
we need this info
             // here before create table, such as 
table-default.kafka.bootstrap.servers defined in
             // catalog options. Temporarily, we copy the default options here.
-            if (catalog instanceof org.apache.paimon.catalog.AbstractCatalog) {
-                ((org.apache.paimon.catalog.AbstractCatalog) catalog)
-                        .copyTableDefaultOptions(options);
-            }
+            
Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
             options.put(REGISTER_TIMEOUT.key(), 
logStoreAutoRegisterTimeout.toString());
             registerLogSystem(catalog, identifier, options, classLoader);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index dd32c52c6..30e32d62e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -40,6 +40,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
 /** Abstract base of {@link Action} for table. */
 public abstract class ActionBase implements Action {
 
@@ -55,6 +57,11 @@ public abstract class ActionBase implements Action {
         catalogOptions = Options.fromMap(catalogConfig);
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
 
+        // disable cache to avoid concurrent modification exception
+        if (!catalogOptions.contains(CACHE_ENABLED)) {
+            catalogOptions.set(CACHE_ENABLED, false);
+        }
+
         catalog = initPaimonCatalog();
         flinkCatalog = initFlinkCatalog();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index 5b320f4e9..e5c833606 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.clone;
 
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.fs.FileIO;
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.PATH;
+
 /** A Operator to copy files. */
 public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
         implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
@@ -43,8 +45,8 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
     private final Map<String, String> sourceCatalogConfig;
     private final Map<String, String> targetCatalogConfig;
 
-    private AbstractCatalog sourceCatalog;
-    private AbstractCatalog targetCatalog;
+    private Catalog sourceCatalog;
+    private Catalog targetCatalog;
 
     public CopyFileOperator(
             Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
@@ -55,13 +57,9 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
     @Override
     public void open() throws Exception {
         sourceCatalog =
-                (AbstractCatalog)
-                        FlinkCatalogFactory.createPaimonCatalog(
-                                Options.fromMap(sourceCatalogConfig));
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
         targetCatalog =
-                (AbstractCatalog)
-                        FlinkCatalogFactory.createPaimonCatalog(
-                                Options.fromMap(targetCatalogConfig));
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
     }
 
     @Override
@@ -71,11 +69,19 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
         FileIO sourceTableFileIO = sourceCatalog.fileIO();
         FileIO targetTableFileIO = targetCatalog.fileIO();
         Path sourceTableRootPath =
-                sourceCatalog.getDataTableLocation(
-                        
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+                new Path(
+                        sourceCatalog
+                                .getTable(
+                                        
Identifier.fromString(cloneFileInfo.getSourceIdentifier()))
+                                .options()
+                                .get(PATH.key()));
         Path targetTableRootPath =
-                targetCatalog.getDataTableLocation(
-                        
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+                new Path(
+                        targetCatalog
+                                .getTable(
+                                        
Identifier.fromString(cloneFileInfo.getTargetIdentifier()))
+                                .options()
+                                .get(PATH.key()));
 
         String filePathExcludeTableRoot = 
cloneFileInfo.getFilePathExcludeTableRoot();
         Path sourcePath = new Path(sourceTableRootPath + 
filePathExcludeTableRoot);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
index 166544eb7..128875a8b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.migrate.Migrator;
 import org.apache.paimon.utils.ParameterUtils;
 
@@ -47,14 +46,10 @@ public class MigrateDatabaseProcedure extends ProcedureBase 
{
             String sourceDatabasePath,
             String properties)
             throws Exception {
-        if (!(catalog instanceof HiveCatalog)) {
-            throw new IllegalArgumentException("Only support Hive Catalog");
-        }
-        HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
         List<Migrator> migrators =
                 TableMigrationUtils.getImporters(
                         connector,
-                        hiveCatalog,
+                        catalog,
                         sourceDatabasePath,
                         
ParameterUtils.parseCommaSeparatedKeyValues(properties));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index c9a273336..110b4e25f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.migrate.Migrator;
 
 import org.apache.flink.table.procedure.ProcedureContext;
@@ -62,9 +61,6 @@ public class MigrateFileProcedure extends ProcedureBase {
             String targetPaimonTablePath,
             boolean deleteOrigin)
             throws Exception {
-        if (!(catalog instanceof HiveCatalog)) {
-            throw new IllegalArgumentException("Only support Hive Catalog");
-        }
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
         Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
 
@@ -76,7 +72,7 @@ public class MigrateFileProcedure extends ProcedureBase {
         Migrator importer =
                 TableMigrationUtils.getImporter(
                         connector,
-                        (HiveCatalog) catalog,
+                        catalog,
                         sourceTableId.getDatabaseName(),
                         sourceTableId.getObjectName(),
                         targetTableId.getDatabaseName(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index f721f8925..39e6092d8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
@@ -51,10 +50,6 @@ public class MigrateTableProcedure extends ProcedureBase {
             String sourceTablePath,
             String properties)
             throws Exception {
-        if (!(catalog instanceof HiveCatalog)) {
-            throw new IllegalArgumentException("Only support Hive Catalog");
-        }
-
         String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
 
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
@@ -62,7 +57,7 @@ public class MigrateTableProcedure extends ProcedureBase {
 
         TableMigrationUtils.getImporter(
                         connector,
-                        (HiveCatalog) catalog,
+                        catalog,
                         sourceTableId.getDatabaseName(),
                         sourceTableId.getObjectName(),
                         targetTableId.getDatabaseName(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
index 0c208116b..196adf475 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
@@ -55,10 +54,6 @@ public class RepairProcedure extends ProcedureBase {
 
     public String[] call(ProcedureContext procedureContext, String identifier)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
-        if (!(catalog instanceof HiveCatalog)) {
-            throw new IllegalArgumentException("Only support Hive Catalog");
-        }
-
         if (StringUtils.isBlank(identifier)) {
             catalog.repairCatalog();
             return new String[] {"Success"};
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 32d017be6..47655d39a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.utils;
 
+import org.apache.paimon.catalog.CachingCatalog;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.hive.migrate.HiveMigrator;
 import org.apache.paimon.migrate.Migrator;
@@ -30,7 +32,7 @@ public class TableMigrationUtils {
 
     public static Migrator getImporter(
             String connector,
-            HiveCatalog paimonCatalog,
+            Catalog catalog,
             String sourceDatabase,
             String souceTableName,
             String targetDatabase,
@@ -38,8 +40,14 @@ public class TableMigrationUtils {
             Map<String, String> options) {
         switch (connector) {
             case "hive":
+                if (catalog instanceof CachingCatalog) {
+                    catalog = ((CachingCatalog) catalog).wrapped();
+                }
+                if (!(catalog instanceof HiveCatalog)) {
+                    throw new IllegalArgumentException("Only support Hive 
Catalog");
+                }
                 return new HiveMigrator(
-                        paimonCatalog,
+                        (HiveCatalog) catalog,
                         sourceDatabase,
                         souceTableName,
                         targetDatabase,
@@ -51,13 +59,17 @@ public class TableMigrationUtils {
     }
 
     public static List<Migrator> getImporters(
-            String connector,
-            HiveCatalog paimonCatalog,
-            String sourceDatabase,
-            Map<String, String> options) {
+            String connector, Catalog catalog, String sourceDatabase, 
Map<String, String> options) {
         switch (connector) {
             case "hive":
-                return HiveMigrator.databaseMigrators(paimonCatalog, 
sourceDatabase, options);
+                if (catalog instanceof CachingCatalog) {
+                    catalog = ((CachingCatalog) catalog).wrapped();
+                }
+                if (!(catalog instanceof HiveCatalog)) {
+                    throw new IllegalArgumentException("Only support Hive 
Catalog");
+                }
+                return HiveMigrator.databaseMigrators(
+                        (HiveCatalog) catalog, sourceDatabase, options);
             default:
                 throw new UnsupportedOperationException("Don't support 
connector " + connector);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 0fe417364..bbc827b24 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,14 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.DateTimeUtils;
 
@@ -38,7 +31,6 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -440,33 +432,6 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B"));
     }
 
-    @Test
-    public void testIgnoreDeleteCompatible() throws Exception {
-        sql(
-                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING) "
-                        + "WITH ('merge-engine' = 'deduplicate', 'write-only' 
= 'true')");
-
-        sql("INSERT INTO ignore_delete VALUES (1, 'A')");
-        // write delete records
-        sql("DELETE FROM ignore_delete WHERE pk = 1");
-        assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
-
-        // set ignore-delete and read
-        Map<String, String> newOptions = new HashMap<>();
-        newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
-        SchemaUtils.forceCommit(
-                new SchemaManager(LocalFileIO.create(), new Path(path, 
"default.db/ignore_delete")),
-                new Schema(
-                        Arrays.asList(
-                                new DataField(0, "pk", 
DataTypes.INT().notNull()),
-                                new DataField(1, "v", DataTypes.STRING())),
-                        Collections.emptyList(),
-                        Collections.singletonList("pk"),
-                        newOptions,
-                        null));
-        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A"));
-    }
-
     @Test
     public void testIgnoreDeleteWithRowKindField() {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index e358fb311..4e1ea424f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLock;
 import org.apache.paimon.catalog.CatalogLockContext;
@@ -98,7 +97,7 @@ public class FileSystemCatalogITCase extends AbstractTestBase 
{
         Identifier identifier = new Identifier(DB_NAME, "t3");
         Catalog catalog =
                 ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
-        Path tablePath = ((AbstractCatalog) 
catalog).getDataTableLocation(identifier);
+        Path tablePath = new 
Path(catalog.getTable(identifier).options().get("path"));
         assertThat(tablePath.toString())
                 .isEqualTo(new File(path, DB_NAME + ".db" + File.separator + 
"t3").toString());
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 587069ada..f95ec3ae4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.log.LogSinkProvider;
@@ -647,9 +646,18 @@ public class FlinkCatalogTest {
             CatalogTable t2,
             Map<String, String> optionsToAdd,
             Set<String> optionsToRemove) {
-        Path tablePath =
-                ((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
-                        .getDataTableLocation(FlinkCatalog.toIdentifier(path));
+        Path tablePath;
+        try {
+            tablePath =
+                    new Path(
+                            ((FlinkCatalog) catalog)
+                                    .catalog()
+                                    .getTable(FlinkCatalog.toIdentifier(path))
+                                    .options()
+                                    .get(CoreOptions.PATH.key()));
+        } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+            throw new RuntimeException(e);
+        }
         Map<String, String> options = new HashMap<>(t1.getOptions());
         options.put("path", tablePath.toString());
         options.putAll(optionsToAdd);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index f65a50081..399109137 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,14 +18,6 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.CommonTestUtils;
 
@@ -44,9 +36,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
@@ -627,37 +617,4 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
                         Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple"));
         iterator.close();
     }
-
-    @Test
-    public void testIgnoreDeleteCompatible() throws Exception {
-        sql(
-                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a STRING, b STRING) WITH ("
-                        + " 'merge-engine' = 'deduplicate',"
-                        + " 'write-only' = 'true')");
-        sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 
'apple')");
-        // write delete records
-        sql("DELETE FROM ignore_delete WHERE pk = 1");
-        sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS 
STRING))");
-        assertThat(sql("SELECT * FROM ignore_delete"))
-                .containsExactlyInAnyOrder(Row.of(1, "A", null));
-
-        // force altering merge engine and read
-        Map<String, String> newOptions = new HashMap<>();
-        newOptions.put(
-                CoreOptions.MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
-        newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
-        SchemaUtils.forceCommit(
-                new SchemaManager(LocalFileIO.create(), new Path(path, 
"default.db/ignore_delete")),
-                new Schema(
-                        Arrays.asList(
-                                new DataField(0, "pk", 
DataTypes.INT().notNull()),
-                                new DataField(1, "a", DataTypes.STRING()),
-                                new DataField(2, "b", DataTypes.STRING())),
-                        Collections.emptyList(),
-                        Collections.singletonList("pk"),
-                        newOptions,
-                        null));
-        assertThat(sql("SELECT * FROM ignore_delete"))
-                .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 5e7f0108c..1b17ee172 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -51,6 +51,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
 /** {@link Action} test base. */
 public abstract class ActionITCaseBase extends AbstractTestBase {
 
@@ -70,7 +72,9 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
         tableName = "test_table_" + UUID.randomUUID();
         commitUser = UUID.randomUUID().toString();
         incrementalIdentifier = 0;
-        catalog = CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)));
+        CatalogContext context = CatalogContext.create(new Path(warehouse));
+        context.options().set(CACHE_ENABLED, false);
+        catalog = CatalogFactory.createCatalog(context);
     }
 
     @AfterEach
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
index e0c5f5d66..db973b39c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.procedure.privilege;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.privilege.FileBasedPrivilegeManager;
@@ -85,7 +84,6 @@ public class PrivilegeProcedureITCase extends 
AbstractTestBase {
         Catalog paimonCatalog = ((FlinkCatalog) catalog).catalog();
         assertThat(paimonCatalog).isInstanceOf(PrivilegedCatalog.class);
         PrivilegedCatalog privilegedCatalog = (PrivilegedCatalog) 
paimonCatalog;
-        
assertThat(privilegedCatalog.wrapped()).isInstanceOf(FileSystemCatalog.class);
         assertThat(privilegedCatalog.privilegeManager())
                 .isInstanceOf(FileBasedPrivilegeManager.class);
         
assertThat(privilegedCatalog.privilegeManager().privilegeEnabled()).isTrue();
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 1bbf4b387..e9fc9f32c 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -36,9 +36,6 @@ import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.options.OptionsUtils;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
-import org.apache.paimon.privilege.PrivilegedCatalog;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -875,25 +872,12 @@ public class HiveCatalog extends AbstractCatalog {
             throw new UncheckedIOException(e);
         }
 
-        Catalog catalog =
-                new HiveCatalog(
-                        fileIO,
-                        hiveConf,
-                        options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
-                        options,
-                        warehouse.toUri().toString());
-
-        PrivilegeManager privilegeManager =
-                new FileBasedPrivilegeManager(
-                        warehouse.toString(),
-                        fileIO,
-                        context.options().get(PrivilegedCatalog.USER),
-                        context.options().get(PrivilegedCatalog.PASSWORD));
-        if (privilegeManager.privilegeEnabled()) {
-            catalog = new PrivilegedCatalog(catalog, privilegeManager);
-        }
-
-        return catalog;
+        return new HiveCatalog(
+                fileIO,
+                hiveConf,
+                options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+                options,
+                warehouse.toUri().toString());
     }
 
     public static HiveConf createHiveConf(CatalogContext context) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index b04dce2fa..dd3efc724 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -219,6 +219,14 @@ public class SparkCatalog extends SparkBaseCatalog {
         }
     }
 
+    @Override
+    public void invalidateTable(Identifier ident) {
+        try {
+            catalog.invalidateTable(toIdentifier(ident));
+        } catch (NoSuchTableException ignored) {
+        }
+    }
+
     @Override
     public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
         return loadSparkTable(ident, Collections.emptyMap());
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
index 4a45b365a..992f17253 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.utils;
 
+import org.apache.paimon.catalog.CachingCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.hive.migrate.HiveMigrator;
@@ -30,7 +31,7 @@ public class TableMigrationUtils {
 
     public static Migrator getImporter(
             String connector,
-            Catalog paimonCatalog,
+            Catalog catalog,
             String sourceDatabase,
             String sourceTableName,
             String targetDatabase,
@@ -38,9 +39,14 @@ public class TableMigrationUtils {
             Map<String, String> options) {
         switch (connector) {
             case "hive":
-                assert paimonCatalog instanceof HiveCatalog;
+                if (catalog instanceof CachingCatalog) {
+                    catalog = ((CachingCatalog) catalog).wrapped();
+                }
+                if (!(catalog instanceof HiveCatalog)) {
+                    throw new IllegalArgumentException("Only support Hive 
Catalog");
+                }
                 return new HiveMigrator(
-                        (HiveCatalog) paimonCatalog,
+                        (HiveCatalog) catalog,
                         sourceDatabase,
                         sourceTableName,
                         targetDatabase,
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 55ce7c9aa..f12a3d8fa 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -59,6 +59,7 @@ import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -88,6 +89,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
 
         Options options = new Options();
         options.set(WAREHOUSE, 
spark.conf().get("spark.sql.catalog.paimon.warehouse"));
+        options.set(CACHE_ENABLED, false);
         fileSystemCatalog =
                 (FileSystemCatalog) 
CatalogFactory.createCatalog(CatalogContext.create(options));
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index b8115132d..983dd037f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, 
Identifier}
-import org.apache.paimon.options.Options
+import org.apache.paimon.options.{CatalogOptions, Options}
 import org.apache.paimon.spark.catalog.Catalogs
 import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions}
@@ -36,6 +36,8 @@ import org.scalactic.source.Position
 import org.scalatest.Tag
 
 import java.io.File
+import java.util
+import java.util.{HashMap => JHashMap}
 import java.util.TimeZone
 
 import scala.util.Random
@@ -64,6 +66,7 @@ class PaimonSparkTestBase
     super.sparkConf
       .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
       .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+      .set("spark.sql.catalog.paimon.cache-enabled", "false")
       .set("spark.sql.extensions", 
classOf[PaimonSparkSessionExtensions].getName)
       .set("spark.serializer", serializer)
   }
@@ -122,7 +125,9 @@ class PaimonSparkTestBase
 
   private def initCatalog(): Catalog = {
     val currentCatalog = 
spark.sessionState.catalogManager.currentCatalog.name()
-    val options = Catalogs.catalogOptions(currentCatalog, 
spark.sessionState.conf)
+    val options =
+      new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog, 
spark.sessionState.conf))
+    options.put(CatalogOptions.CACHE_ENABLED.key(), "false")
     val catalogContext =
       CatalogContext.create(Options.fromMap(options), 
spark.sessionState.newHadoopConf())
     CatalogFactory.createCatalog(catalogContext)


Reply via email to