This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fb8deebe [core] Delete useless methods in Catalog (#4468)
6fb8deebe is described below

commit 6fb8deebe8abfff9aa6296ea86a5dce905681f22
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 6 19:31:53 2024 +0800

    [core] Delete useless methods in Catalog (#4468)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java | 57 +++++++++------
 .../org/apache/paimon/catalog/CachingCatalog.java  | 17 +++--
 .../java/org/apache/paimon/catalog/Catalog.java    | 71 ++-----------------
 .../java/org/apache/paimon/catalog/Database.java   | 82 ++++++++++++++++++++++
 .../org/apache/paimon/catalog/DelegateCatalog.java | 33 +--------
 .../apache/paimon/catalog/FileSystemCatalog.java   | 16 +----
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   | 16 ++---
 .../apache/paimon/privilege/PrivilegedCatalog.java | 31 +++++---
 .../org/apache/paimon/catalog/CatalogTestBase.java | 54 +++-----------
 .../flink/procedure/MigrateFileProcedure.java      |  5 +-
 .../flink/action/cdc/SyncTableActionBase.java      |  4 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 34 +++++++--
 .../apache/paimon/flink/log/LogStoreRegister.java  | 13 ++--
 .../flink/procedure/MigrateFileProcedure.java      |  5 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 75 +++++++++-----------
 .../apache/paimon/hive/migrate/HiveMigrator.java   | 22 +++---
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 18 ++---
 .../java/org/apache/paimon/spark/SparkCatalog.java | 21 +++---
 .../spark/procedure/MigrateFileProcedure.java      |  4 +-
 19 files changed, 288 insertions(+), 290 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index b3f255f10..8e885b95c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -98,7 +98,6 @@ public abstract class AbstractCatalog implements Catalog {
         return fileIO;
     }
 
-    @Override
     public Optional<CatalogLockFactory> lockFactory() {
         if (!lockEnabled()) {
             return Optional.empty();
@@ -118,7 +117,6 @@ public abstract class AbstractCatalog implements Catalog {
         return Optional.empty();
     }
 
-    @Override
     public Optional<CatalogLockContext> lockContext() {
         return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
     }
@@ -136,26 +134,26 @@ public abstract class AbstractCatalog implements Catalog {
     public void createDatabase(String name, boolean ignoreIfExists, 
Map<String, String> properties)
             throws DatabaseAlreadyExistException {
         checkNotSystemDatabase(name);
-        if (databaseExists(name)) {
+        try {
+            getDatabase(name);
             if (ignoreIfExists) {
                 return;
             }
             throw new DatabaseAlreadyExistException(name);
+        } catch (DatabaseNotExistException ignored) {
         }
         createDatabaseImpl(name, properties);
     }
 
     @Override
-    public Map<String, String> loadDatabaseProperties(String name)
-            throws DatabaseNotExistException {
+    public Database getDatabase(String name) throws DatabaseNotExistException {
         if (isSystemDatabase(name)) {
-            return Collections.emptyMap();
+            return Database.of(name);
         }
-        return loadDatabasePropertiesImpl(name);
+        return getDatabaseImpl(name);
     }
 
-    protected abstract Map<String, String> loadDatabasePropertiesImpl(String 
name)
-            throws DatabaseNotExistException;
+    protected abstract Database getDatabaseImpl(String name) throws 
DatabaseNotExistException;
 
     @Override
     public void createPartition(Identifier identifier, Map<String, String> 
partitionSpec)
@@ -211,7 +209,9 @@ public abstract class AbstractCatalog implements Catalog {
     public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
             throws DatabaseNotExistException, DatabaseNotEmptyException {
         checkNotSystemDatabase(name);
-        if (!databaseExists(name)) {
+        try {
+            getDatabase(name);
+        } catch (DatabaseNotExistException e) {
             if (ignoreIfNotExists) {
                 return;
             }
@@ -232,9 +232,9 @@ public abstract class AbstractCatalog implements Catalog {
         if (isSystemDatabase(databaseName)) {
             return SystemTableLoader.loadGlobalTableNames();
         }
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(databaseName);
-        }
+
+        // check db exists
+        getDatabase(databaseName);
 
         return 
listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
     }
@@ -247,7 +247,9 @@ public abstract class AbstractCatalog implements Catalog {
         checkNotBranch(identifier, "dropTable");
         checkNotSystemTable(identifier, "dropTable");
 
-        if (!tableExists(identifier)) {
+        try {
+            getTable(identifier);
+        } catch (TableNotExistException e) {
             if (ignoreIfNotExists) {
                 return;
             }
@@ -268,15 +270,16 @@ public abstract class AbstractCatalog implements Catalog {
         validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
         validateAutoCreateClose(schema.options());
 
-        if (!databaseExists(identifier.getDatabaseName())) {
-            throw new DatabaseNotExistException(identifier.getDatabaseName());
-        }
+        // check db exists
+        getDatabase(identifier.getDatabaseName());
 
-        if (tableExists(identifier)) {
+        try {
+            getTable(identifier);
             if (ignoreIfExists) {
                 return;
             }
             throw new TableAlreadyExistException(identifier);
+        } catch (TableNotExistException ignored) {
         }
 
         copyTableDefaultOptions(schema.options());
@@ -299,15 +302,19 @@ public abstract class AbstractCatalog implements Catalog {
         checkNotSystemTable(toTable, "renameTable");
         validateIdentifierNameCaseInsensitive(toTable);
 
-        if (!tableExists(fromTable)) {
+        try {
+            getTable(fromTable);
+        } catch (TableNotExistException e) {
             if (ignoreIfNotExists) {
                 return;
             }
             throw new TableNotExistException(fromTable);
         }
 
-        if (tableExists(toTable)) {
+        try {
+            getTable(toTable);
             throw new TableAlreadyExistException(toTable);
+        } catch (TableNotExistException ignored) {
         }
 
         renameTableImpl(fromTable, toTable);
@@ -323,7 +330,9 @@ public abstract class AbstractCatalog implements Catalog {
         validateIdentifierNameCaseInsensitive(identifier);
         validateFieldNameCaseInsensitiveInSchemaChange(changes);
 
-        if (!tableExists(identifier)) {
+        try {
+            getTable(identifier);
+        } catch (TableNotExistException e) {
             if (ignoreIfNotExists) {
                 return;
             }
@@ -452,6 +461,12 @@ public abstract class AbstractCatalog implements Catalog {
     protected abstract TableSchema getDataTableSchema(Identifier identifier)
             throws TableNotExistException;
 
+    /** Get metastore client factory for the table specified by {@code 
identifier}. */
+    protected Optional<MetastoreClient.Factory> 
metastoreClientFactory(Identifier identifier)
+            throws TableNotExistException {
+        return Optional.empty();
+    }
+
     @Override
     public Path getTableLocation(Identifier identifier) {
         return new Path(newDatabasePath(identifier.getDatabaseName()), 
identifier.getTableName());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 1a6b23078..003f0edb4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -62,7 +62,7 @@ public class CachingCatalog extends DelegateCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
 
-    protected final Cache<String, Map<String, String>> databaseCache;
+    protected final Cache<String, Database> databaseCache;
     protected final Cache<Identifier, Table> tableCache;
     @Nullable protected final SegmentsCache<Path> manifestCache;
 
@@ -159,16 +159,15 @@ public class CachingCatalog extends DelegateCatalog {
     }
 
     @Override
-    public Map<String, String> loadDatabaseProperties(String databaseName)
-            throws DatabaseNotExistException {
-        Map<String, String> properties = 
databaseCache.getIfPresent(databaseName);
-        if (properties != null) {
-            return properties;
+    public Database getDatabase(String databaseName) throws 
DatabaseNotExistException {
+        Database database = databaseCache.getIfPresent(databaseName);
+        if (database != null) {
+            return database;
         }
 
-        properties = super.loadDatabaseProperties(databaseName);
-        databaseCache.put(databaseName, properties);
-        return properties;
+        database = super.getDatabase(databaseName);
+        databaseCache.put(databaseName, database);
+        return database;
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c72c354e4..6a6a047bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
@@ -33,7 +32,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -72,23 +70,6 @@ public interface Catalog extends AutoCloseable {
 
     FileIO fileIO();
 
-    /**
-     * Get lock factory from catalog. Lock is used to support multiple 
concurrent writes on the
-     * object store.
-     */
-    Optional<CatalogLockFactory> lockFactory();
-
-    /** Get lock context for lock factory to create a lock. */
-    default Optional<CatalogLockContext> lockContext() {
-        return Optional.empty();
-    }
-
-    /** Get metastore client factory for the table specified by {@code 
identifier}. */
-    default Optional<MetastoreClient.Factory> 
metastoreClientFactory(Identifier identifier)
-            throws TableNotExistException {
-        return Optional.empty();
-    }
-
     /**
      * Get the names of all databases in this catalog.
      *
@@ -96,21 +77,6 @@ public interface Catalog extends AutoCloseable {
      */
     List<String> listDatabases();
 
-    /**
-     * Check if a database exists in this catalog.
-     *
-     * @param databaseName Name of the database
-     * @return true if the given database exists in the catalog false otherwise
-     */
-    default boolean databaseExists(String databaseName) {
-        try {
-            loadDatabaseProperties(databaseName);
-            return true;
-        } catch (DatabaseNotExistException e) {
-            return false;
-        }
-    }
-
     /**
      * Create a database, see {@link Catalog#createDatabase(String name, 
boolean ignoreIfExists, Map
      * properties)}.
@@ -135,13 +101,13 @@ public interface Catalog extends AutoCloseable {
             throws DatabaseAlreadyExistException;
 
     /**
-     * Load database properties.
+     * Return a {@link Database} identified by the given name.
      *
      * @param name Database name
-     * @return The requested database's properties
+     * @return The requested {@link Database}
      * @throws DatabaseNotExistException if the requested database does not 
exist
      */
-    Map<String, String> loadDatabaseProperties(String name) throws 
DatabaseNotExistException;
+    Database getDatabase(String name) throws DatabaseNotExistException;
 
     /**
      * Drop a database.
@@ -186,20 +152,6 @@ public interface Catalog extends AutoCloseable {
      */
     List<String> listTables(String databaseName) throws 
DatabaseNotExistException;
 
-    /**
-     * Check if a table exists in this catalog.
-     *
-     * @param identifier Path of the table
-     * @return true if the given table exists in the catalog false otherwise
-     */
-    default boolean tableExists(Identifier identifier) {
-        try {
-            return getTable(identifier) != null;
-        } catch (TableNotExistException e) {
-            return false;
-        }
-    }
-
     /**
      * Drop a table.
      *
@@ -273,6 +225,9 @@ public interface Catalog extends AutoCloseable {
     /**
      * Create the partition of the specify table.
      *
+     * <p>Only catalog with metastore can support this method, and only table 
with
+     * 'metastore.partitioned-table' can support this method.
+     *
      * @param identifier path of the table to drop partition
      * @param partitionSpec the partition to be created
      * @throws TableNotExistException if the table does not exist
@@ -315,20 +270,6 @@ public interface Catalog extends AutoCloseable {
         alterTable(identifier, Collections.singletonList(change), 
ignoreIfNotExists);
     }
 
-    /**
-     * Check if a view exists in this catalog.
-     *
-     * @param identifier Path of the view
-     * @return true if the given view exists in the catalog false otherwise
-     */
-    default boolean viewExists(Identifier identifier) {
-        try {
-            return getView(identifier) != null;
-        } catch (ViewNotExistException e) {
-            return false;
-        }
-    }
-
     /**
      * Return a {@link View} identified by the given {@link Identifier}.
      *
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Database.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Database.java
new file mode 100644
index 000000000..f855e57e9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Database.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.annotation.Public;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface of a database in a catalog.
+ *
+ * @since 1.0
+ */
+@Public
+public interface Database {
+
+    /** A name to identify this database. */
+    String name();
+
+    /** Options of this database. */
+    Map<String, String> options();
+
+    /** Optional comment of this database. */
+    Optional<String> comment();
+
+    static Database of(String name, Map<String, String> options, @Nullable 
String comment) {
+        return new DatabaseImpl(name, options, comment);
+    }
+
+    static Database of(String name) {
+        return new DatabaseImpl(name, new HashMap<>(), null);
+    }
+
+    /** Implementation of {@link Database}. */
+    class DatabaseImpl implements Database {
+
+        private final String name;
+        private final Map<String, String> options;
+        @Nullable private final String comment;
+
+        public DatabaseImpl(String name, Map<String, String> options, 
@Nullable String comment) {
+            this.name = name;
+            this.options = options;
+            this.comment = comment;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public Map<String, String> options() {
+            return options;
+        }
+
+        @Override
+        public Optional<String> comment() {
+            return Optional.ofNullable(comment);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 01719e590..ec14d53a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -21,7 +21,6 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
@@ -29,7 +28,6 @@ import org.apache.paimon.view.View;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /** A {@link Catalog} to delegate all operations to another {@link Catalog}. */
 public class DelegateCatalog implements Catalog {
@@ -64,22 +62,6 @@ public class DelegateCatalog implements Catalog {
         return wrapped.fileIO();
     }
 
-    @Override
-    public Optional<CatalogLockFactory> lockFactory() {
-        return wrapped.lockFactory();
-    }
-
-    @Override
-    public Optional<CatalogLockContext> lockContext() {
-        return wrapped.lockContext();
-    }
-
-    @Override
-    public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier 
identifier)
-            throws TableNotExistException {
-        return wrapped.metastoreClientFactory(identifier);
-    }
-
     @Override
     public List<String> listDatabases() {
         return wrapped.listDatabases();
@@ -92,9 +74,8 @@ public class DelegateCatalog implements Catalog {
     }
 
     @Override
-    public Map<String, String> loadDatabaseProperties(String name)
-            throws DatabaseNotExistException {
-        return wrapped.loadDatabaseProperties(name);
+    public Database getDatabase(String name) throws DatabaseNotExistException {
+        return wrapped.getDatabase(name);
     }
 
     @Override
@@ -138,16 +119,6 @@ public class DelegateCatalog implements Catalog {
         return wrapped.getTable(identifier);
     }
 
-    @Override
-    public boolean tableExists(Identifier identifier) {
-        return wrapped.tableExists(identifier);
-    }
-
-    @Override
-    public boolean viewExists(Identifier identifier) {
-        return wrapped.viewExists(identifier);
-    }
-
     @Override
     public View getView(Identifier identifier) throws ViewNotExistException {
         return wrapped.getView(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 450f78873..9264a5464 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -30,7 +30,6 @@ import org.apache.paimon.schema.TableSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -81,12 +80,11 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Map<String, String> loadDatabasePropertiesImpl(String name)
-            throws DatabaseNotExistException {
+    public Database getDatabaseImpl(String name) throws 
DatabaseNotExistException {
         if (!uncheck(() -> fileIO.exists(newDatabasePath(name)))) {
             throw new DatabaseNotExistException(name);
         }
-        return Collections.emptyMap();
+        return Database.of(name);
     }
 
     @Override
@@ -99,16 +97,6 @@ public class FileSystemCatalog extends AbstractCatalog {
         return uncheck(() -> 
listTablesInFileSystem(newDatabasePath(databaseName)));
     }
 
-    @Override
-    public boolean tableExists(Identifier identifier) {
-        if (isTableInSystemDatabase(identifier)) {
-            return super.tableExists(identifier);
-        }
-
-        return tableExistsInFileSystem(
-                getTableLocation(identifier), 
identifier.getBranchNameOrDefault());
-    }
-
     @Override
     public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
         return tableSchemaInFileSystem(
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index e52bc9d6b..c80f8e3a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.CatalogLockContext;
 import org.apache.paimon.catalog.CatalogLockFactory;
+import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -159,18 +160,17 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected Map<String, String> loadDatabasePropertiesImpl(String 
databaseName)
-            throws DatabaseNotExistException {
+    protected Database getDatabaseImpl(String databaseName) throws 
DatabaseNotExistException {
         if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
             throw new DatabaseNotExistException(databaseName);
         }
-        Map<String, String> properties = Maps.newHashMap();
-        properties.putAll(fetchProperties(databaseName));
-        if (!properties.containsKey(DB_LOCATION_PROP)) {
-            properties.put(DB_LOCATION_PROP, 
newDatabasePath(databaseName).getName());
+        Map<String, String> options = Maps.newHashMap();
+        options.putAll(fetchProperties(databaseName));
+        if (!options.containsKey(DB_LOCATION_PROP)) {
+            options.put(DB_LOCATION_PROP, 
newDatabasePath(databaseName).getName());
         }
-        properties.remove(DATABASE_EXISTS_PROPERTY);
-        return ImmutableMap.copyOf(properties);
+        options.remove(DATABASE_EXISTS_PROPERTY);
+        return Database.of(databaseName, options, null);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index bda06a08d..c9b9c2193 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -102,12 +102,16 @@ public class PrivilegedCatalog extends DelegateCatalog {
             throws TableNotExistException, TableAlreadyExistException {
         privilegeManager.getPrivilegeChecker().assertCanAlterTable(fromTable);
         wrapped.renameTable(fromTable, toTable, ignoreIfNotExists);
-        Preconditions.checkState(
-                wrapped.tableExists(toTable),
-                "Table "
-                        + toTable
-                        + " does not exist. There might be concurrent 
renaming. "
-                        + "Aborting updates in privilege system.");
+
+        try {
+            getTable(toTable);
+        } catch (TableNotExistException e) {
+            throw new IllegalStateException(
+                    "Table "
+                            + toTable
+                            + " does not exist. There might be concurrent 
renaming. "
+                            + "Aborting updates in privilege system.");
+        }
         privilegeManager.objectRenamed(fromTable.getFullName(), 
toTable.getFullName());
     }
 
@@ -157,8 +161,11 @@ public class PrivilegedCatalog extends DelegateCatalog {
         Preconditions.checkArgument(
                 privilege.canGrantOnDatabase(),
                 "Privilege " + privilege + " can't be granted on a database");
-        Preconditions.checkArgument(
-                databaseExists(databaseName), "Database " + databaseName + " 
does not exist");
+        try {
+            getDatabase(databaseName);
+        } catch (DatabaseNotExistException e) {
+            throw new IllegalArgumentException("Database " + databaseName + " 
does not exist");
+        }
         privilegeManager.grant(user, databaseName, privilege);
     }
 
@@ -166,8 +173,12 @@ public class PrivilegedCatalog extends DelegateCatalog {
         Preconditions.checkArgument(
                 privilege.canGrantOnTable(),
                 "Privilege " + privilege + " can't be granted on a table");
-        Preconditions.checkArgument(
-                tableExists(identifier), "Table " + identifier + " does not 
exist");
+
+        try {
+            getTable(identifier);
+        } catch (TableNotExistException e) {
+            throw new IllegalArgumentException("Table " + identifier + " does 
not exist");
+        }
         privilegeManager.grant(user, identifier.getFullName(), privilege);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 1a087f6b4..27992b56f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -110,24 +110,11 @@ public abstract class CatalogTestBase {
         assertThat(databases).contains("db1", "db2", "db3");
     }
 
-    @Test
-    public void testDatabaseExistsWhenExists() throws Exception {
-        // Database exists returns true when the database exists
-        catalog.createDatabase("test_db", false);
-        boolean exists = catalog.databaseExists("test_db");
-        assertThat(exists).isTrue();
-
-        // Database exists returns false when the database does not exist
-        exists = catalog.databaseExists("non_existing_db");
-        assertThat(exists).isFalse();
-    }
-
     @Test
     public void testCreateDatabase() throws Exception {
         // Create database creates a new database when it does not exist
         catalog.createDatabase("new_db", false);
-        boolean exists = catalog.databaseExists("new_db");
-        assertThat(exists).isTrue();
+        catalog.getDatabase("new_db");
 
         catalog.createDatabase("existing_db", false);
 
@@ -148,8 +135,8 @@ public abstract class CatalogTestBase {
         // Drop database deletes the database when it exists and there are no 
tables
         catalog.createDatabase("db_to_drop", false);
         catalog.dropDatabase("db_to_drop", false, false);
-        boolean exists = catalog.databaseExists("db_to_drop");
-        assertThat(exists).isFalse();
+        assertThatThrownBy(() -> catalog.getDatabase("db_to_drop"))
+                .isInstanceOf(Catalog.DatabaseNotExistException.class);
 
         // Drop database does not throw exception when database does not exist 
and ignoreIfNotExists
         // is true
@@ -162,8 +149,8 @@ public abstract class CatalogTestBase {
         catalog.createTable(Identifier.create("db_to_drop", "table2"), 
DEFAULT_TABLE_SCHEMA, false);
 
         catalog.dropDatabase("db_to_drop", false, true);
-        exists = catalog.databaseExists("db_to_drop");
-        assertThat(exists).isFalse();
+        assertThatThrownBy(() -> catalog.getDatabase("db_to_drop"))
+                .isInstanceOf(Catalog.DatabaseNotExistException.class);
 
         // Drop database throws DatabaseNotEmptyException when cascade is 
false and there are tables
         // in the database
@@ -192,21 +179,6 @@ public abstract class CatalogTestBase {
         assertThat(tables).containsExactlyInAnyOrder("table1", "table2", 
"table3");
     }
 
-    @Test
-    public void testTableExists() throws Exception {
-        // Table exists returns true when the table exists in the database
-        catalog.createDatabase("test_db", false);
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
-
-        boolean exists = catalog.tableExists(identifier);
-        assertThat(exists).isTrue();
-
-        // Table exists returns false when the table does not exist in the 
database
-        exists = catalog.tableExists(Identifier.create("non_existing_db", 
"non_existing_table"));
-        assertThat(exists).isFalse();
-    }
-
     @Test
     public void testCreateTable() throws Exception {
         catalog.createDatabase("test_db", false);
@@ -238,8 +210,7 @@ public abstract class CatalogTestBase {
         schema.options().remove(CoreOptions.AUTO_CREATE.key());
 
         catalog.createTable(identifier, schema, false);
-        boolean exists = catalog.tableExists(identifier);
-        assertThat(exists).isTrue();
+        catalog.getTable(identifier);
 
         // Create table throws Exception when table is system table
         assertThatExceptionOfType(IllegalArgumentException.class)
@@ -381,8 +352,8 @@ public abstract class CatalogTestBase {
         Identifier identifier = Identifier.create("test_db", "table_to_drop");
         catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
         catalog.dropTable(identifier, false);
-        boolean exists = catalog.tableExists(identifier);
-        assertThat(exists).isFalse();
+        assertThatThrownBy(() -> catalog.getTable(identifier))
+                .isInstanceOf(Catalog.TableNotExistException.class);
 
         // Drop table throws Exception when table is system table
         assertThatExceptionOfType(IllegalArgumentException.class)
@@ -414,8 +385,9 @@ public abstract class CatalogTestBase {
         catalog.createTable(fromTable, DEFAULT_TABLE_SCHEMA, false);
         Identifier toTable = Identifier.create("test_db", "new_table");
         catalog.renameTable(fromTable, toTable, false);
-        assertThat(catalog.tableExists(fromTable)).isFalse();
-        assertThat(catalog.tableExists(toTable)).isTrue();
+        assertThatThrownBy(() -> catalog.getTable(fromTable))
+                .isInstanceOf(Catalog.TableNotExistException.class);
+        catalog.getTable(toTable);
 
         // Rename table throws Exception when original or target table is 
system table
         assertThatExceptionOfType(IllegalArgumentException.class)
@@ -885,8 +857,6 @@ public abstract class CatalogTestBase {
 
         catalog.createView(identifier, view, false);
 
-        assertThat(catalog.viewExists(identifier)).isTrue();
-
         View catalogView = catalog.getView(identifier);
         assertThat(catalogView.fullName()).isEqualTo(view.fullName());
         assertThat(catalogView.rowType()).isEqualTo(view.rowType());
@@ -911,8 +881,6 @@ public abstract class CatalogTestBase {
         catalog.renameView(identifier, newIdentifier, false);
 
         catalog.dropView(newIdentifier, false);
-        assertThat(catalog.viewExists(newIdentifier)).isFalse();
-
         catalog.dropView(newIdentifier, true);
         assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
                 .isInstanceOf(Catalog.ViewNotExistException.class);
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 5d68cc0f5..1e581c38c 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
 import org.apache.paimon.migrate.Migrator;
@@ -88,7 +89,9 @@ public class MigrateFileProcedure extends ProcedureBase {
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
         Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
 
-        if (!(catalog.tableExists(targetTableId))) {
+        try {
+            catalog.getTable(targetTableId);
+        } catch (Catalog.TableNotExistException e) {
             throw new IllegalArgumentException(
                     "Target paimon table does not exist: " + 
targetPaimonTablePath);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 4c7db6d28..87efeb2a1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -122,7 +122,7 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
     protected void beforeBuildingSourceSink() throws Exception {
         Identifier identifier = new Identifier(database, table);
         // Check if table exists before trying to get or create it
-        if (catalog.tableExists(identifier)) {
+        try {
             fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
             fileStoreTable = alterTableOptions(identifier, fileStoreTable);
             try {
@@ -146,7 +146,7 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
                 // check partition keys and primary keys in case that user 
specified them
                 checkConstraints();
             }
-        } else {
+        } catch (Catalog.TableNotExistException e) {
             Schema retrievedSchema = retrieveSchema();
             computedColumns = buildComputedColumns(computedColumnArgs, 
retrievedSchema.fields());
             Schema paimonSchema = buildPaimonSchema(retrievedSchema);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 194d73213..ec485d2eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -187,7 +187,9 @@ public class FlinkCatalog extends AbstractCatalog {
         this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
         this.disableCreateTableInDefaultDatabase = 
options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
         if (!disableCreateTableInDefaultDatabase) {
-            if (!catalog.databaseExists(defaultDatabase)) {
+            try {
+                getDatabase(defaultDatabase);
+            } catch (DatabaseNotExistException e) {
                 try {
                     catalog.createDatabase(defaultDatabase, true);
                 } catch (Catalog.DatabaseAlreadyExistException ignore) {
@@ -212,7 +214,12 @@ public class FlinkCatalog extends AbstractCatalog {
 
     @Override
     public boolean databaseExists(String databaseName) throws CatalogException 
{
-        return catalog.databaseExists(databaseName);
+        try {
+            catalog.getDatabase(databaseName);
+            return true;
+        } catch (Catalog.DatabaseNotExistException e) {
+            return false;
+        }
     }
 
     @Override
@@ -346,26 +353,41 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public boolean tableExists(ObjectPath tablePath) throws CatalogException {
         Identifier identifier = toIdentifier(tablePath);
-        return catalog.tableExists(identifier) || 
catalog.viewExists(identifier);
+        try {
+            catalog.getTable(identifier);
+            return true;
+        } catch (Catalog.TableNotExistException e) {
+            try {
+                catalog.getView(identifier);
+                return true;
+            } catch (Catalog.ViewNotExistException ex) {
+                return false;
+            }
+        }
     }
 
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
         Identifier identifier = toIdentifier(tablePath);
-        if (catalog.viewExists(identifier)) {
+        try {
+            catalog.getView(identifier);
             try {
                 catalog.dropView(identifier, ignoreIfNotExists);
                 return;
             } catch (Catalog.ViewNotExistException e) {
                 throw new RuntimeException("Unexpected exception.", e);
             }
+        } catch (Catalog.ViewNotExistException ignored) {
         }
 
         try {
             Table table = null;
-            if (logStoreAutoRegister && catalog.tableExists(identifier)) {
-                table = catalog.getTable(identifier);
+            if (logStoreAutoRegister) {
+                try {
+                    table = catalog.getTable(identifier);
+                } catch (Catalog.TableNotExistException ignored) {
+                }
             }
             catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
             if (logStoreAutoRegister && table != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
index b730d289b..ad501e204 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
@@ -46,11 +46,14 @@ public interface LogStoreRegister {
             ClassLoader classLoader) {
         Options tableOptions = Options.fromMap(options);
         String logStore = tableOptions.get(LOG_SYSTEM);
-        if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)
-                && !catalog.tableExists(identifier)) {
-            LogStoreRegister logStoreRegister =
-                    getLogStoreRegister(identifier, classLoader, tableOptions, 
logStore);
-            options.putAll(logStoreRegister.registerTopic());
+        if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) {
+            try {
+                catalog.getTable(identifier);
+            } catch (Catalog.TableNotExistException e) {
+                LogStoreRegister logStoreRegister =
+                        getLogStoreRegister(identifier, classLoader, 
tableOptions, logStore);
+                options.putAll(logStoreRegister.registerTopic());
+            }
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 34b016fe0..f2f10d087 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
 import org.apache.paimon.migrate.Migrator;
@@ -77,7 +78,9 @@ public class MigrateFileProcedure extends ProcedureBase {
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
         Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
 
-        if (!(catalog.tableExists(targetTableId))) {
+        try {
+            catalog.getTable(targetTableId);
+        } catch (Catalog.TableNotExistException e) {
             throw new IllegalArgumentException(
                     "Target paimon table does not exist: " + 
targetPaimonTablePath);
         }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 8f2211041..ce1607e8d 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -297,10 +297,18 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Map<String, String> loadDatabasePropertiesImpl(String name)
+    public org.apache.paimon.catalog.Database getDatabaseImpl(String name)
             throws DatabaseNotExistException {
         try {
-            return convertToProperties(clients.run(client -> 
client.getDatabase(name)));
+            Database database = clients.run(client -> 
client.getDatabase(name));
+            Map<String, String> options = new 
HashMap<>(database.getParameters());
+            if (database.getLocationUri() != null) {
+                options.put(DB_LOCATION_PROP, database.getLocationUri());
+            }
+            if (database.getDescription() != null) {
+                options.put(COMMENT_PROP, database.getDescription());
+            }
+            return org.apache.paimon.catalog.Database.of(name, options, 
database.getDescription());
         } catch (NoSuchObjectException e) {
             throw new DatabaseNotExistException(name);
         } catch (TException e) {
@@ -312,17 +320,6 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
-    private Map<String, String> convertToProperties(Database database) {
-        Map<String, String> properties = new 
HashMap<>(database.getParameters());
-        if (database.getLocationUri() != null) {
-            properties.put(DB_LOCATION_PROP, database.getLocationUri());
-        }
-        if (database.getDescription() != null) {
-            properties.put(COMMENT_PROP, database.getDescription());
-        }
-        return properties;
-    }
-
     @Override
     public void dropPartition(Identifier identifier, Map<String, String> 
partitionSpec)
             throws TableNotExistException {
@@ -395,11 +392,16 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     protected List<String> listTablesImpl(String databaseName) {
         try {
-            return clients.run(
-                    client ->
-                            client.getAllTables(databaseName).stream()
-                                    .filter(t -> tableExists(new 
Identifier(databaseName, t)))
-                                    .collect(Collectors.toList()));
+            List<String> allTables = clients.run(client -> 
client.getAllTables(databaseName));
+            List<String> result = new ArrayList<>(allTables.size());
+            for (String table : allTables) {
+                try {
+                    getTable(new Identifier(databaseName, table));
+                    result.add(table);
+                } catch (TableNotExistException ignored) {
+                }
+            }
+            return result;
         } catch (TException e) {
             throw new RuntimeException("Failed to list all tables in database 
" + databaseName, e);
         } catch (InterruptedException e) {
@@ -408,21 +410,6 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
-    @Override
-    public boolean tableExists(Identifier identifier) {
-        if (isTableInSystemDatabase(identifier)) {
-            return super.tableExists(identifier);
-        }
-
-        try {
-            Table table = getHmsTable(identifier);
-            return isPaimonTable(identifier, table)
-                    || (formatTableEnabled() && isFormatTable(table));
-        } catch (TableNotExistException e) {
-            return false;
-        }
-    }
-
     @Override
     public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
         Table table = getHmsTable(identifier);
@@ -462,9 +449,7 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     public void createView(Identifier identifier, View view, boolean 
ignoreIfExists)
             throws ViewAlreadyExistException, DatabaseNotExistException {
-        if (!databaseExists(identifier.getDatabaseName())) {
-            throw new DatabaseNotExistException(identifier.getDatabaseName());
-        }
+        getDatabase(identifier.getDatabaseName());
 
         try {
             getView(identifier);
@@ -541,9 +526,7 @@ public class HiveCatalog extends AbstractCatalog {
         if (isSystemDatabase(databaseName)) {
             return Collections.emptyList();
         }
-        if (!databaseExists(databaseName)) {
-            throw new DatabaseNotExistException(databaseName);
-        }
+        getDatabase(databaseName);
 
         try {
             List<String> tables = clients.run(client -> 
client.getAllTables(databaseName));
@@ -571,15 +554,21 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     public void renameView(Identifier fromView, Identifier toView, boolean 
ignoreIfNotExists)
             throws ViewNotExistException, ViewAlreadyExistException {
-        if (!viewExists(fromView)) {
+        try {
+            getView(fromView);
+        } catch (ViewNotExistException e) {
             if (ignoreIfNotExists) {
                 return;
             }
             throw new ViewNotExistException(fromView);
         }
-        if (viewExists(toView)) {
+
+        try {
+            getView(toView);
             throw new ViewAlreadyExistException(toView);
+        } catch (ViewNotExistException ignored) {
         }
+
         try {
             String fromDB = fromView.getDatabaseName();
             String fromViewName = fromView.getTableName();
@@ -870,7 +859,9 @@ public class HiveCatalog extends AbstractCatalog {
         checkNotSystemDatabase(databaseName);
 
         // create database if needed
-        if (!databaseExists(databaseName)) {
+        try {
+            getDatabase(databaseName);
+        } catch (DatabaseNotExistException e) {
             createDatabaseImpl(databaseName, Collections.emptyMap());
         }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index b9928ce73..d1478830a 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.hive.migrate;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryWriter;
@@ -81,8 +82,8 @@ public class HiveMigrator implements Migrator {
     private final String targetDatabase;
     private final String targetTable;
     private final CoreOptions coreOptions;
-    private Boolean delete = true;
-    private Integer parallelism;
+
+    private Boolean deleteOriginTable = true;
 
     public HiveMigrator(
             HiveCatalog hiveCatalog,
@@ -99,7 +100,6 @@ public class HiveMigrator implements Migrator {
         this.sourceTable = sourceTable;
         this.targetDatabase = targetDatabase;
         this.targetTable = targetTable;
-        this.parallelism = parallelism;
         this.coreOptions = new CoreOptions(options);
         this.executor = createCachedThreadPool(parallelism, "HIVE_MIGRATOR");
     }
@@ -129,8 +129,8 @@ public class HiveMigrator implements Migrator {
     }
 
     @Override
-    public void deleteOriginTable(boolean delete) {
-        this.delete = delete;
+    public void deleteOriginTable(boolean deleteOriginTable) {
+        this.deleteOriginTable = deleteOriginTable;
     }
 
     @Override
@@ -145,14 +145,18 @@ public class HiveMigrator implements Migrator {
 
         // create paimon table if not exists
         Identifier identifier = Identifier.create(targetDatabase, targetTable);
-        boolean alreadyExist = hiveCatalog.tableExists(identifier);
-        if (!alreadyExist) {
+
+        boolean deleteIfFail = false;
+        try {
+            hiveCatalog.getTable(identifier);
+        } catch (Catalog.TableNotExistException e) {
             Schema schema =
                     from(
                             client.getSchema(sourceDatabase, sourceTable),
                             sourceHiveTable.getPartitionKeys(),
                             properties);
             hiveCatalog.createTable(identifier, schema, false);
+            deleteIfFail = true;
         }
 
         try {
@@ -211,14 +215,14 @@ public class HiveMigrator implements Migrator {
                 commit.commit(new ArrayList<>(commitMessages));
             }
         } catch (Exception e) {
-            if (!alreadyExist) {
+            if (deleteIfFail) {
                 hiveCatalog.dropTable(identifier, true);
             }
             throw new RuntimeException("Migrating failed", e);
         }
 
         // if all success, drop the origin table according the delete field
-        if (delete) {
+        if (deleteOriginTable) {
             client.dropTable(sourceDatabase, sourceTable, true, true);
         }
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 5123a1ee0..37601f4f8 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -19,15 +19,15 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.hive.annotation.Minio;
 import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
 import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.operation.Lock;
 import org.apache.paimon.privilege.NoPrivilegeException;
 import org.apache.paimon.s3.MinioTestContainer;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.IOUtils;
@@ -203,7 +203,7 @@ public abstract class HiveCatalogITCaseBase {
     @Test
     @LocationInProperties
     public void testDbLocationWithMetastoreLocationInProperties()
-            throws Catalog.DatabaseAlreadyExistException {
+            throws Catalog.DatabaseAlreadyExistException, 
Catalog.DatabaseNotExistException {
         String dbLocation = minioTestContainer.getS3UriForDefaultBucket() + 
"/" + UUID.randomUUID();
         Catalog catalog =
                 ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
@@ -211,7 +211,7 @@ public abstract class HiveCatalogITCaseBase {
         properties.put("location", dbLocation);
 
         catalog.createDatabase("location_test_db", false, properties);
-        assertThat(catalog.databaseExists("location_test_db"));
+        catalog.getDatabase("location_test_db");
 
         hiveShell.execute("USE location_test_db");
         hiveShell.execute("CREATE TABLE location_test_db ( a INT, b INT )");
@@ -1128,11 +1128,12 @@ public abstract class HiveCatalogITCaseBase {
     }
 
     @Test
-    public void testHiveLock() throws InterruptedException {
+    public void testHiveLock() throws InterruptedException, 
Catalog.TableNotExistException {
         tEnv.executeSql("CREATE TABLE t (a INT)");
         Catalog catalog =
                 ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
-        CatalogLockFactory lockFactory = catalog.lockFactory().get();
+        FileStoreTable table = (FileStoreTable) catalog.getTable(new 
Identifier("test_db", "t"));
+        CatalogEnvironment catalogEnv = table.catalogEnvironment();
 
         AtomicInteger count = new AtomicInteger(0);
         List<Thread> threads = new ArrayList<>();
@@ -1147,11 +1148,10 @@ public abstract class HiveCatalogITCaseBase {
             Thread thread =
                     new Thread(
                             () -> {
-                                CatalogLock lock =
-                                        
lockFactory.createLock(catalog.lockContext().get());
+                                Lock lock = catalogEnv.lockFactory().create();
                                 for (int j = 0; j < 10; j++) {
                                     try {
-                                        lock.runWithLock("test_db", "t", 
unsafeIncrement);
+                                        lock.runWithLock(unsafeIncrement);
                                     } catch (Exception e) {
                                         throw new RuntimeException(e);
                                     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index ae5ab8b6e..2e4a2eaec 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -102,7 +102,9 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         this.catalog = CatalogFactory.createCatalog(catalogContext);
         this.defaultDatabase =
                 options.getOrDefault(DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE.defaultValue());
-        if (!catalog.databaseExists(defaultNamespace()[0])) {
+        try {
+            catalog.getDatabase(defaultNamespace()[0]);
+        } catch (Catalog.DatabaseNotExistException e) {
             try {
                 createNamespace(defaultNamespace(), new HashMap<>());
             } catch (NamespaceAlreadyExistsException ignored) {
@@ -152,10 +154,12 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         if (!isValidateNamespace(namespace)) {
             throw new NoSuchNamespaceException(namespace);
         }
-        if (catalog.databaseExists(namespace[0])) {
+        try {
+            catalog.getDatabase(namespace[0]);
             return new String[0][];
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(namespace);
         }
-        throw new NoSuchNamespaceException(namespace);
     }
 
     @Override
@@ -167,7 +171,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
                 Arrays.toString(namespace));
         String dataBaseName = namespace[0];
         try {
-            return catalog.loadDatabaseProperties(dataBaseName);
+            return catalog.getDatabase(dataBaseName).options();
         } catch (Catalog.DatabaseNotExistException e) {
             throw new NoSuchNamespaceException(namespace);
         }
@@ -284,15 +288,6 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         }
     }
 
-    @Override
-    public boolean tableExists(Identifier ident) {
-        try {
-            return catalog.tableExists(toIdentifier(ident));
-        } catch (NoSuchTableException e) {
-            return false;
-        }
-    }
-
     @Override
     public org.apache.spark.sql.connector.catalog.Table alterTable(
             Identifier ident, TableChange... changes) throws 
NoSuchTableException {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
index 32f89d47b..95d55df01 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
@@ -88,7 +88,9 @@ public class MigrateFileProcedure extends BaseProcedure {
 
         Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
 
-        if (!(paimonCatalog.tableExists(targetTableId))) {
+        try {
+            paimonCatalog.getTable(targetTableId);
+        } catch (Catalog.TableNotExistException e) {
             throw new IllegalArgumentException(
                     "Target paimon table does not exist: " + targetTable);
         }

Reply via email to