This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cfeb22ff3 [catalog] Refactor Catalog Factory and revert jdbc lock in 
FileSystemCatalog (#3099)
cfeb22ff3 is described below

commit cfeb22ff3b985a78a99fa71ceecc3fe808ef9440
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 27 13:07:31 2024 +0800

    [catalog] Refactor Catalog Factory and revert jdbc lock in 
FileSystemCatalog (#3099)
---
 docs/content/how-to/creating-catalogs.md           | 10 ---
 .../org/apache/paimon/catalog/AbstractCatalog.java | 39 ++++-----
 .../java/org/apache/paimon/catalog/Catalog.java    |  4 +-
 .../org/apache/paimon/catalog/CatalogLock.java     | 10 ---
 .../{CatalogLock.java => CatalogLockContext.java}  | 25 ++----
 .../{CatalogLock.java => CatalogLockFactory.java}  | 23 +----
 .../apache/paimon/catalog/FileSystemCatalog.java   | 18 +---
 .../apache/paimon/catalog/LockContextUtils.java    | 98 ----------------------
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   | 12 ++-
 .../org/apache/paimon/jdbc/JdbcCatalogFactory.java |  8 --
 .../org/apache/paimon/jdbc/JdbcCatalogLock.java    | 36 --------
 .../apache/paimon/jdbc/JdbcCatalogLockContext.java | 49 +++++++++++
 .../apache/paimon/jdbc/JdbcCatalogLockFactory.java | 52 ++++++++++++
 .../java/org/apache/paimon/operation/Lock.java     | 22 ++---
 .../services/org.apache.paimon.factories.Factory   |  2 +-
 .../paimon/catalog/FileSystemCatalogTest.java      | 68 ---------------
 .../paimon/flink/FileSystemCatalogITCase.java      | 19 ++---
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 32 +++----
 .../org/apache/paimon/hive/HiveCatalogLock.java    | 33 --------
 .../apache/paimon/hive/HiveCatalogLockContext.java | 50 +++++++++++
 .../apache/paimon/hive/HiveCatalogLockFactory.java | 52 ++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  4 +-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  6 +-
 23 files changed, 278 insertions(+), 394 deletions(-)

diff --git a/docs/content/how-to/creating-catalogs.md 
b/docs/content/how-to/creating-catalogs.md
index fc2927d41..536a6b165 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -53,16 +53,6 @@ USE CATALOG my_catalog;
 
 You can define any default table options with the prefix `table-default.` for 
tables created in the catalog.
 
-The FileSystem catalog supports jdbc lock and can take effect through the 
following configuration:
-
-> ```shell
-> 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
-> 'jdbc.user' = '...',
-> 'jdbc.password' = '...',
-> ```
-
-
-
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c69a72b0d..4a4fb04fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -83,26 +83,31 @@ public abstract class AbstractCatalog implements Catalog {
         this.tableDefaultOptions =
                 convertToPropertiesPrefixKey(options.toMap(), 
TABLE_DEFAULT_OPTION_PREFIX);
         this.catalogOptions = options;
+    }
 
-        if (lockEnabled()) {
-            checkArgument(options.contains(LOCK_TYPE), "No lock type when lock 
is enabled.");
+    @Override
+    public Optional<CatalogLockFactory> lockFactory() {
+        if (!lockEnabled()) {
+            return Optional.empty();
         }
+
+        String lock = catalogOptions.get(LOCK_TYPE);
+        if (lock == null) {
+            return defaultLockFactory();
+        }
+
+        return Optional.of(
+                FactoryUtil.discoverFactory(
+                        AbstractCatalog.class.getClassLoader(), 
CatalogLockFactory.class, lock));
     }
 
-    @Override
-    public Optional<CatalogLock.LockFactory> lockFactory() {
-        return lockEnabled()
-                ? Optional.of(
-                        FactoryUtil.discoverFactory(
-                                AbstractCatalog.class.getClassLoader(),
-                                CatalogLock.LockFactory.class,
-                                catalogOptions.get(LOCK_TYPE)))
-                : Optional.empty();
+    public Optional<CatalogLockFactory> defaultLockFactory() {
+        return Optional.empty();
     }
 
     @Override
-    public Optional<CatalogLock.LockContext> lockContext() {
-        return Optional.of(new OptionLockContext(catalogOptions));
+    public Optional<CatalogLockContext> lockContext() {
+        return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
     }
 
     protected boolean lockEnabled() {
@@ -492,12 +497,4 @@ public abstract class AbstractCatalog implements Catalog {
                         "The value of %s property should be %s.",
                         CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
     }
-
-    static class OptionLockContext implements CatalogLock.LockContext {
-        private final Options catalogOptions;
-
-        public OptionLockContext(Options catalogOptions) {
-            this.catalogOptions = catalogOptions;
-        }
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 0168891b0..99b71e8de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -49,10 +49,10 @@ public interface Catalog extends AutoCloseable {
      * Get lock factory from catalog. Lock is used to support multiple 
concurrent writes on the
      * object store.
      */
-    Optional<CatalogLock.LockFactory> lockFactory();
+    Optional<CatalogLockFactory> lockFactory();
 
     /** Get lock context for lock factory to create a lock. */
-    default Optional<CatalogLock.LockContext> lockContext() {
+    default Optional<CatalogLockContext> lockContext() {
         return Optional.empty();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
index 0e547037e..4f147c6aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
@@ -19,10 +19,8 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
-import org.apache.paimon.factories.Factory;
 
 import java.io.Closeable;
-import java.io.Serializable;
 import java.util.concurrent.Callable;
 
 /**
@@ -35,12 +33,4 @@ public interface CatalogLock extends Closeable {
 
     /** Run with catalog lock. The caller should tell catalog the database and 
table name. */
     <T> T runWithLock(String database, String table, Callable<T> callable) 
throws Exception;
-
-    /** Factory to create {@link CatalogLock}. */
-    interface LockFactory extends Factory, Serializable {
-        CatalogLock create(LockContext context);
-    }
-
-    /** Context for lock factory to create lock. */
-    interface LockContext extends Serializable {}
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java
similarity index 53%
copy from paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
copy to 
paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java
index 0e547037e..442409c81 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java
@@ -18,29 +18,16 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.annotation.Public;
-import org.apache.paimon.factories.Factory;
+import org.apache.paimon.options.Options;
 
-import java.io.Closeable;
 import java.io.Serializable;
-import java.util.concurrent.Callable;
 
-/**
- * An interface that allows source and sink to use global lock to some 
transaction-related things.
- *
- * @since 0.4.0
- */
-@Public
-public interface CatalogLock extends Closeable {
+/** Context for lock factory to create lock. */
+public interface CatalogLockContext extends Serializable {
 
-    /** Run with catalog lock. The caller should tell catalog the database and 
table name. */
-    <T> T runWithLock(String database, String table, Callable<T> callable) 
throws Exception;
+    Options options();
 
-    /** Factory to create {@link CatalogLock}. */
-    interface LockFactory extends Factory, Serializable {
-        CatalogLock create(LockContext context);
+    static CatalogLockContext fromOptions(Options options) {
+        return () -> options;
     }
-
-    /** Context for lock factory to create lock. */
-    interface LockContext extends Serializable {}
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java
similarity index 55%
copy from paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
copy to 
paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java
index 0e547037e..d964ebda4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java
@@ -18,29 +18,12 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.annotation.Public;
 import org.apache.paimon.factories.Factory;
 
-import java.io.Closeable;
 import java.io.Serializable;
-import java.util.concurrent.Callable;
 
-/**
- * An interface that allows source and sink to use global lock to some 
transaction-related things.
- *
- * @since 0.4.0
- */
-@Public
-public interface CatalogLock extends Closeable {
-
-    /** Run with catalog lock. The caller should tell catalog the database and 
table name. */
-    <T> T runWithLock(String database, String table, Callable<T> callable) 
throws Exception;
-
-    /** Factory to create {@link CatalogLock}. */
-    interface LockFactory extends Factory, Serializable {
-        CatalogLock create(LockContext context);
-    }
+/** Factory to create {@link CatalogLock}. */
+public interface CatalogLockFactory extends Factory, Serializable {
 
-    /** Context for lock factory to create lock. */
-    interface LockContext extends Serializable {}
+    CatalogLock createLock(CatalogLockContext context);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 2c1d1e8df..8ffe0f271 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.client.ClientPool;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -36,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
@@ -48,8 +46,6 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     private final Path warehouse;
 
-    private ClientPool.ClientPoolImpl clientPool;
-
     public FileSystemCatalog(FileIO fileIO, Path warehouse) {
         super(fileIO);
         this.warehouse = warehouse;
@@ -149,7 +145,7 @@ public class FileSystemCatalog extends AbstractCatalog {
                 lockFactory()
                         .map(
                                 fac ->
-                                        fac.create(
+                                        fac.createLock(
                                                 lockContext()
                                                         .orElseThrow(
                                                                 () ->
@@ -160,14 +156,6 @@ public class FileSystemCatalog extends AbstractCatalog {
                 .withLock(catalogLock == null ? null : 
Lock.fromCatalog(catalogLock, identifier));
     }
 
-    @Override
-    public Optional<CatalogLock.LockContext> lockContext() {
-        if (clientPool == null) {
-            this.clientPool = 
LockContextUtils.tryInitializeClientPool(catalogOptions);
-        }
-        return LockContextUtils.lockContext(this.clientPool, catalogOptions, 
"filesystem");
-    }
-
     @Override
     public void renameTableImpl(Identifier fromTable, Identifier toTable) {
         Path fromPath = getDataTableLocation(fromTable);
@@ -199,9 +187,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public void close() throws Exception {
-        LockContextUtils.close(clientPool);
-    }
+    public void close() throws Exception {}
 
     @Override
     public String warehouse() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
deleted file mode 100644
index 699c54de4..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.catalog;
-
-import org.apache.paimon.client.ClientPool;
-import org.apache.paimon.jdbc.JdbcCatalogFactory;
-import org.apache.paimon.jdbc.JdbcCatalogLock;
-import org.apache.paimon.jdbc.JdbcClientPool;
-import org.apache.paimon.jdbc.JdbcUtils;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.SQLException;
-import java.util.Optional;
-
-/** Utils for {@link org.apache.paimon.catalog.CatalogLock.LockContext}. */
-public class LockContextUtils {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemCatalog.class);
-
-    public static Optional<CatalogLock.LockContext> lockContext(
-            ClientPool.ClientPoolImpl clientPool, Options catalogOptions, 
String catalogKey) {
-        if (clientPool == null) {
-            return Optional.of(new 
AbstractCatalog.OptionLockContext(catalogOptions));
-        }
-        String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
-        switch (lockType) {
-            case JdbcCatalogFactory.IDENTIFIER:
-                JdbcClientPool connections = (JdbcClientPool) clientPool;
-                return Optional.of(
-                        new JdbcCatalogLock.JdbcLockContext(
-                                connections, catalogKey, catalogOptions));
-            default:
-                LOG.warn("Unsupported lock type:" + lockType);
-                return Optional.of(new 
AbstractCatalog.OptionLockContext(catalogOptions));
-        }
-    }
-
-    public static ClientPool.ClientPoolImpl tryInitializeClientPool(Options 
catalogOptions) {
-        String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
-        if (lockType == null) {
-            return null;
-        }
-        switch (lockType) {
-            case JdbcCatalogFactory.IDENTIFIER:
-                JdbcClientPool connections =
-                        new JdbcClientPool(
-                                
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
-                                catalogOptions.get(CatalogOptions.URI.key()),
-                                catalogOptions.toMap());
-                try {
-                    JdbcUtils.createDistributedLockTable(connections, 
catalogOptions);
-                } catch (SQLException e) {
-                    throw new RuntimeException("Cannot initialize JDBC 
distributed lock.", e);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException("Interrupted in call to 
initialize", e);
-                }
-                return connections;
-            default:
-                LOG.warn("Unsupported lock type:" + lockType);
-                return null;
-        }
-    }
-
-    public static void close(ClientPool.ClientPoolImpl clientPool) {
-        if (clientPool == null) {
-            return;
-        }
-        if (clientPool instanceof JdbcClientPool) {
-            JdbcClientPool connections = (JdbcClientPool) clientPool;
-            if (!connections.isClosed()) {
-                connections.close();
-            }
-        } else {
-            clientPool.close();
-        }
-        clientPool = null;
-    }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 91c11ac24..7e7718b5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -20,7 +20,8 @@ package org.apache.paimon.jdbc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -343,8 +344,13 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Optional<CatalogLock.LockContext> lockContext() {
-        return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, 
catalogKey, options));
+    public Optional<CatalogLockFactory> defaultLockFactory() {
+        return Optional.of(new JdbcCatalogLockFactory());
+    }
+
+    @Override
+    public Optional<CatalogLockContext> lockContext() {
+        return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, 
options));
     }
 
     private Lock lock(Identifier identifier) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
index adaaf3f43..6c3c1d0e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -25,9 +25,6 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
-import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-
 /** Factory to create {@link JdbcCatalog}. */
 public class JdbcCatalogFactory implements CatalogFactory {
 
@@ -42,11 +39,6 @@ public class JdbcCatalogFactory implements CatalogFactory {
     public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
         Options options = context.options();
         String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
-        if (options.get(LOCK_ENABLED)) {
-            if (!options.getOptional(LOCK_TYPE).isPresent()) {
-                options.set(LOCK_TYPE, 
JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
-            }
-        }
         return new JdbcCatalog(fileIO, catalogKey, context.options(), 
warehouse.toString());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
index 2d409f12a..307f92f0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.jdbc;
 
 import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.utils.TimeUtils;
 
 import java.io.IOException;
@@ -87,41 +86,6 @@ public class JdbcCatalogLock implements CatalogLock {
         // Do nothing
     }
 
-    /** Jdbc catalog lock factory. */
-    public static class JdbcCatalogLockFactory implements LockFactory {
-
-        private static final long serialVersionUID = 1L;
-        public static final String IDENTIFIER = "jdbc";
-
-        @Override
-        public String identifier() {
-            return IDENTIFIER;
-        }
-
-        @Override
-        public CatalogLock create(LockContext context) {
-            JdbcLockContext lockContext = (JdbcLockContext) context;
-            return new JdbcCatalogLock(
-                    lockContext.connections,
-                    lockContext.catalogKey,
-                    checkMaxSleep(lockContext.conf.toMap()),
-                    acquireTimeout(lockContext.conf.toMap()));
-        }
-    }
-
-    /** Jdbc lock context. */
-    public static class JdbcLockContext implements LockContext {
-        private final JdbcClientPool connections;
-        private final String catalogKey;
-        private final Options conf;
-
-        public JdbcLockContext(JdbcClientPool connections, String catalogKey, 
Options conf) {
-            this.connections = connections;
-            this.catalogKey = catalogKey;
-            this.conf = conf;
-        }
-    }
-
     public static long checkMaxSleep(Map<String, String> conf) {
         return TimeUtils.parseDuration(
                         conf.getOrDefault(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
new file mode 100644
index 000000000..e56b3474c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.options.Options;
+
+/** Jdbc lock context. */
+public class JdbcCatalogLockContext implements CatalogLockContext {
+
+    private final JdbcClientPool connections;
+    private final String catalogKey;
+    private final Options options;
+
+    public JdbcCatalogLockContext(JdbcClientPool connections, String 
catalogKey, Options options) {
+        this.connections = connections;
+        this.catalogKey = catalogKey;
+        this.options = options;
+    }
+
+    @Override
+    public Options options() {
+        return options;
+    }
+
+    public JdbcClientPool connections() {
+        return connections;
+    }
+
+    public String catalogKey() {
+        return catalogKey;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java
new file mode 100644
index 000000000..ce0a2d24e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
+
+import java.util.Map;
+
+import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
+import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
+
+/** Jdbc catalog lock factory. */
+public class JdbcCatalogLockFactory implements CatalogLockFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String IDENTIFIER = "jdbc";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public CatalogLock createLock(CatalogLockContext context) {
+        JdbcCatalogLockContext lockContext = (JdbcCatalogLockContext) context;
+        Map<String, String> optionsMap = lockContext.options().toMap();
+        return new JdbcCatalogLock(
+                lockContext.connections(),
+                lockContext.catalogKey(),
+                checkMaxSleep(optionsMap),
+                acquireTimeout(optionsMap));
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
index a9f27e70a..76cd8b178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
@@ -20,6 +20,8 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 
 import javax.annotation.Nullable;
@@ -44,12 +46,12 @@ public interface Lock extends AutoCloseable {
     }
 
     static Factory factory(
-            @Nullable CatalogLock.LockFactory lockFactory,
-            @Nullable CatalogLock.LockContext lockContext,
+            @Nullable CatalogLockFactory lockFactory,
+            @Nullable CatalogLockContext lockContext,
             Identifier tablePath) {
         return lockFactory == null
                 ? new EmptyFactory()
-                : new CatalogLockFactory(lockFactory, lockContext, tablePath);
+                : new LockFactory(lockFactory, lockContext, tablePath);
     }
 
     static Factory emptyFactory() {
@@ -57,17 +59,17 @@ public interface Lock extends AutoCloseable {
     }
 
     /** A {@link Factory} creating lock from catalog. */
-    class CatalogLockFactory implements Factory {
+    class LockFactory implements Factory {
 
         private static final long serialVersionUID = 1L;
 
-        private final CatalogLock.LockFactory lockFactory;
-        private final CatalogLock.LockContext lockContext;
+        private final CatalogLockFactory lockFactory;
+        private final CatalogLockContext lockContext;
         private final Identifier tablePath;
 
-        public CatalogLockFactory(
-                CatalogLock.LockFactory lockFactory,
-                CatalogLock.LockContext lockContext,
+        public LockFactory(
+                CatalogLockFactory lockFactory,
+                CatalogLockContext lockContext,
                 Identifier tablePath) {
             this.lockFactory = lockFactory;
             this.lockContext = lockContext;
@@ -76,7 +78,7 @@ public interface Lock extends AutoCloseable {
 
         @Override
         public Lock create() {
-            return fromCatalog(lockFactory.create(lockContext), tablePath);
+            return fromCatalog(lockFactory.createLock(lockContext), tablePath);
         }
     }
 
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 34de4106b..0f87c96b0 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,4 +15,4 @@
 
 org.apache.paimon.catalog.FileSystemCatalogFactory
 org.apache.paimon.jdbc.JdbcCatalogFactory
-org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
+org.apache.paimon.jdbc.JdbcCatalogLockFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
deleted file mode 100644
index 0948ab07c..000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.catalog;
-
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.jdbc.JdbcCatalog;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
-
-import org.junit.jupiter.api.BeforeEach;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link FileSystemCatalog}. */
-public class FileSystemCatalogTest extends CatalogTestBase {
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        super.setUp();
-        catalog = initCatalog(Maps.newHashMap());
-    }
-
-    private FileSystemCatalog initCatalog(Map<String, String> props) {
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put(
-                CatalogOptions.URI.key(),
-                "jdbc:sqlite:file::memory:?ic" + 
UUID.randomUUID().toString().replace("-", ""));
-
-        properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
-        properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
-        properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
-        properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
-        properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
-        properties.putAll(props);
-        FileSystemCatalog catalog =
-                new FileSystemCatalog(fileIO, new Path(warehouse), 
Options.fromMap(properties));
-        return catalog;
-    }
-
-    @Override
-    public void testListDatabasesWhenNoDatabases() {
-        List<String> databases = catalog.listDatabases();
-        assertThat(databases).isEqualTo(new ArrayList<>());
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index b68d65dd2..50d28cd11 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
@@ -154,18 +156,6 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
     @Test
     void testCatalogWithLockForSchema() throws Exception {
         LOCK_COUNT.set(0);
-        assertThatThrownBy(
-                        () ->
-                                tEnv.executeSql(
-                                                String.format(
-                                                        "CREATE CATALOG 
fs_with_lock WITH ("
-                                                                + 
"'type'='paimon', "
-                                                                + 
"'warehouse'='%s', "
-                                                                + 
"'lock.enabled'='true'"
-                                                                + ")",
-                                                        path))
-                                        .await())
-                .hasRootCauseMessage("No lock type when lock is enabled.");
         tEnv.executeSql(
                         String.format(
                                 "CREATE CATALOG fs_with_lock WITH ("
@@ -203,7 +193,8 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
     }
 
     /** Lock factory for file system catalog. */
-    public static class FileSystemCatalogDummyLockFactory implements 
CatalogLock.LockFactory {
+    public static class FileSystemCatalogDummyLockFactory implements 
CatalogLockFactory {
+
         private static final String IDENTIFIER = "DUMMY";
 
         @Override
@@ -212,7 +203,7 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
         }
 
         @Override
-        public CatalogLock create(CatalogLock.LockContext context) {
+        public CatalogLock createLock(CatalogLockContext context) {
             return new CatalogLock() {
                 @Override
                 public <T> T runWithLock(String database, String table, 
Callable<T> callable)
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 589e92037..372bfedef 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -23,7 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -77,15 +78,12 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
-import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
 import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
 import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
-import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -93,6 +91,7 @@ import static 
org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
 /** A catalog implementation for Hive. */
 public class HiveCatalog extends AbstractCatalog {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
 
     // Reserved properties
@@ -149,10 +148,15 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Optional<CatalogLock.LockContext> lockContext() {
+    public Optional<CatalogLockFactory> defaultLockFactory() {
+        return Optional.of(new HiveCatalogLockFactory());
+    }
+
+    @Override
+    public Optional<CatalogLockContext> lockContext() {
         return Optional.of(
-                new HiveCatalogLock.HiveLockContext(
-                        new SerializableHiveConf(hiveConf), clientClassName));
+                new HiveCatalogLockContext(
+                        new SerializableHiveConf(hiveConf), clientClassName, 
catalogOptions));
     }
 
     @Override
@@ -635,7 +639,7 @@ public class HiveCatalog extends AbstractCatalog {
             try (InputStream inputStream = 
hiveSite.getFileSystem(hadoopConf).open(hiveSite)) {
                 hiveConf.addResource(inputStream, hiveSite.toString());
                 // trigger a read from the conf to avoid input stream is closed
-                isEmbeddedMetastore(hiveConf);
+                hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
             } catch (IOException e) {
                 throw new RuntimeException(
                         "Failed to load hive-site.xml from specified path:" + 
hiveSite, e);
@@ -656,10 +660,6 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
-    public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
-        return 
isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
-    }
-
     public static Catalog createHiveCatalog(CatalogContext context) {
         HiveConf hiveConf = createHiveConf(context);
         Options options = context.options();
@@ -680,14 +680,6 @@ public class HiveCatalog extends AbstractCatalog {
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
-
-        /** Hive catalog only support hive lock. */
-        if (options.getOptional(LOCK_ENABLED).orElse(false)) {
-            Optional<String> lockType = options.getOptional(LOCK_TYPE);
-            if (!lockType.isPresent()) {
-                options.set(LOCK_TYPE, LOCK_IDENTIFIER);
-            }
-        }
         return new HiveCatalog(
                 fileIO,
                 hiveConf,
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index c49cd020c..8c3d3829e 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -39,7 +39,6 @@ import java.util.concurrent.Callable;
 
 import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
 import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Hive {@link CatalogLock}. */
 public class HiveCatalogLock implements CatalogLock {
@@ -114,28 +113,6 @@ public class HiveCatalogLock implements CatalogLock {
         this.client.close();
     }
 
-    /** Catalog lock factory for hive. */
-    public static class HiveCatalogLockFactory implements LockFactory {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public CatalogLock create(LockContext context) {
-            checkArgument(context instanceof HiveLockContext);
-            HiveLockContext hiveLockContext = (HiveLockContext) context;
-            HiveConf conf = hiveLockContext.hiveConf.conf();
-            return new HiveCatalogLock(
-                    HiveCatalog.createClient(conf, 
hiveLockContext.clientClassName),
-                    checkMaxSleep(conf),
-                    acquireTimeout(conf));
-        }
-
-        @Override
-        public String identifier() {
-            return LOCK_IDENTIFIER;
-        }
-    }
-
     public static long checkMaxSleep(HiveConf conf) {
         return TimeUtils.parseDuration(
                         conf.get(
@@ -151,14 +128,4 @@ public class HiveCatalogLock implements CatalogLock {
                                 
TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
                 .toMillis();
     }
-
-    static class HiveLockContext implements LockContext {
-        private final SerializableHiveConf hiveConf;
-        private final String clientClassName;
-
-        public HiveLockContext(SerializableHiveConf hiveConf, String 
clientClassName) {
-            this.hiveConf = hiveConf;
-            this.clientClassName = clientClassName;
-        }
-    }
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java
new file mode 100644
index 000000000..ecffd7f1e
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.options.Options;
+
+/** Hive {@link CatalogLockContext}. */
+public class HiveCatalogLockContext implements CatalogLockContext {
+
+    private final SerializableHiveConf hiveConf;
+    private final String clientClassName;
+    private final Options options;
+
+    public HiveCatalogLockContext(
+            SerializableHiveConf hiveConf, String clientClassName, Options 
options) {
+        this.hiveConf = hiveConf;
+        this.clientClassName = clientClassName;
+        this.options = options;
+    }
+
+    @Override
+    public Options options() {
+        return options;
+    }
+
+    public SerializableHiveConf hiveConf() {
+        return hiveConf;
+    }
+
+    public String clientClassName() {
+        return clientClassName;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
new file mode 100644
index 000000000..7c05ce3ee
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
+import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
+import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Catalog lock factory for hive. */
+public class HiveCatalogLockFactory implements CatalogLockFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public CatalogLock createLock(CatalogLockContext context) {
+        checkArgument(context instanceof HiveCatalogLockContext);
+        HiveCatalogLockContext hiveLockContext = (HiveCatalogLockContext) 
context;
+        HiveConf conf = hiveLockContext.hiveConf().conf();
+        return new HiveCatalogLock(
+                HiveCatalog.createClient(conf, 
hiveLockContext.clientClassName()),
+                checkMaxSleep(conf),
+                acquireTimeout(conf));
+    }
+
+    @Override
+    public String identifier() {
+        return LOCK_IDENTIFIER;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d4af13cc0..baab92184 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,6 +14,4 @@
 # limitations under the License.
 
 org.apache.paimon.hive.HiveCatalogFactory
-
-# Hive catalog lock factory
-org.apache.paimon.hive.HiveCatalogLock$HiveCatalogLockFactory
+org.apache.paimon.hive.HiveCatalogLockFactory
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 668c88f1f..4d9753bab 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.hive;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.hive.annotation.Minio;
@@ -695,7 +696,7 @@ public abstract class HiveCatalogITCaseBase {
         tEnv.executeSql("CREATE TABLE t (a INT)");
         Catalog catalog =
                 ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
-        CatalogLock.LockFactory lockFactory = catalog.lockFactory().get();
+        CatalogLockFactory lockFactory = catalog.lockFactory().get();
 
         AtomicInteger count = new AtomicInteger(0);
         List<Thread> threads = new ArrayList<>();
@@ -710,7 +711,8 @@ public abstract class HiveCatalogITCaseBase {
             Thread thread =
                     new Thread(
                             () -> {
-                                CatalogLock lock = 
lockFactory.create(catalog.lockContext().get());
+                                CatalogLock lock =
+                                        
lockFactory.createLock(catalog.lockContext().get());
                                 for (int j = 0; j < 10; j++) {
                                     try {
                                         lock.runWithLock("test_db", "t", 
unsafeIncrement);

Reply via email to