This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a0aec65e [core] Support configuring lock in paimon catalog (#2933)
8a0aec65e is described below

commit 8a0aec65ee5c91857fd15d254a34ad0b9daab8f7
Author: Fang Yong <[email protected]>
AuthorDate: Fri Mar 15 16:27:09 2024 +0800

    [core] Support configuring lock in paimon catalog (#2933)
    
    * [core] Support configuring lock in paimon catalog
---
 .../generated/catalog_configuration.html           |  8 ++-
 .../org/apache/paimon/options/CatalogOptions.java  |  8 ++-
 .../org/apache/paimon/catalog/AbstractCatalog.java | 35 ++++++++++++
 .../apache/paimon/catalog/FileSystemCatalog.java   | 30 ++++++----
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   | 15 ++---
 .../org/apache/paimon/jdbc/JdbcCatalogFactory.java | 12 +++-
 .../org/apache/paimon/jdbc/JdbcCatalogLock.java    | 40 ++++++-------
 .../services/org.apache.paimon.factories.Factory   |  1 +
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    |  1 +
 .../paimon/flink/FileSystemCatalogITCase.java      | 65 +++++++++++++++++++++-
 .../services/org.apache.paimon.factories.Factory   |  5 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 27 ++++-----
 .../org/apache/paimon/hive/HiveCatalogLock.java    | 14 ++---
 .../services/org.apache.paimon.factories.Factory   |  3 +
 14 files changed, 197 insertions(+), 67 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index e68555944..cab6e731e 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -62,11 +62,17 @@ under the License.
             <td>Boolean</td>
             <td>Enable Catalog Lock.</td>
         </tr>
+        <tr>
+            <td><h5>lock.type</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The Lock Type for Catalog, such as 'hive', 'zookeeper'.</td>
+        </tr>
         <tr>
             <td><h5>metastore</h5></td>
             <td style="word-wrap: break-word;">"filesystem"</td>
             <td>String</td>
-            <td>Metastore of paimon catalog, supports filesystem、hive and 
jdbc.</td>
+            <td>Metastore of paimon catalog, supports filesystem, hive and 
jdbc.</td>
         </tr>
         <tr>
             <td><h5>table.type</h5></td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 42cd9e418..f00a35a75 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -40,7 +40,7 @@ public class CatalogOptions {
                     .stringType()
                     .defaultValue("filesystem")
                     .withDescription(
-                            "Metastore of paimon catalog, supports 
filesystem、hive and jdbc.");
+                            "Metastore of paimon catalog, supports filesystem, 
hive and jdbc.");
 
     public static final ConfigOption<String> URI =
             ConfigOptions.key("uri")
@@ -60,6 +60,12 @@ public class CatalogOptions {
                     .defaultValue(false)
                     .withDescription("Enable Catalog Lock.");
 
+    public static final ConfigOption<String> LOCK_TYPE =
+            ConfigOptions.key("lock.type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The Lock Type for Catalog, such as 
'hive', 'zookeeper'.");
+
     public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
             key("lock-check-max-sleep")
                     .durationType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 0dbbb1f40..c69a72b0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -46,10 +46,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -80,6 +83,30 @@ public abstract class AbstractCatalog implements Catalog {
         this.tableDefaultOptions =
                 convertToPropertiesPrefixKey(options.toMap(), 
TABLE_DEFAULT_OPTION_PREFIX);
         this.catalogOptions = options;
+
+        if (lockEnabled()) {
+            checkArgument(options.contains(LOCK_TYPE), "No lock type when lock 
is enabled.");
+        }
+    }
+
+    @Override
+    public Optional<CatalogLock.LockFactory> lockFactory() {
+        return lockEnabled()
+                ? Optional.of(
+                        FactoryUtil.discoverFactory(
+                                AbstractCatalog.class.getClassLoader(),
+                                CatalogLock.LockFactory.class,
+                                catalogOptions.get(LOCK_TYPE)))
+                : Optional.empty();
+    }
+
+    @Override
+    public Optional<CatalogLock.LockContext> lockContext() {
+        return Optional.of(new OptionLockContext(catalogOptions));
+    }
+
+    protected boolean lockEnabled() {
+        return catalogOptions.get(LOCK_ENABLED);
     }
 
     @Override
@@ -465,4 +492,12 @@ public abstract class AbstractCatalog implements Catalog {
                         "The value of %s property should be %s.",
                         CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
     }
+
+    static class OptionLockContext implements CatalogLock.LockContext {
+        private final Options catalogOptions;
+
+        public OptionLockContext(Options catalogOptions) {
+            this.catalogOptions = catalogOptions;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 0c01e9cb7..e71c92dc4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
@@ -56,11 +56,6 @@ public class FileSystemCatalog extends AbstractCatalog {
         this.warehouse = warehouse;
     }
 
-    @Override
-    public Optional<CatalogLock.LockFactory> lockFactory() {
-        return Optional.empty();
-    }
-
     @Override
     public List<String> listDatabases() {
         List<String> databases = new ArrayList<>();
@@ -128,8 +123,7 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
-        Path path = getDataTableLocation(identifier);
-        return new SchemaManager(fileIO, path)
+        return schemaManager(identifier)
                 .latest()
                 .orElseThrow(() -> new TableNotExistException(identifier));
     }
@@ -142,8 +136,24 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public void createTableImpl(Identifier identifier, Schema schema) {
+        uncheck(() -> schemaManager(identifier).createTable(schema));
+    }
+
+    private SchemaManager schemaManager(Identifier identifier) {
         Path path = getDataTableLocation(identifier);
-        uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
+        CatalogLock catalogLock =
+                lockFactory()
+                        .map(
+                                fac ->
+                                        fac.create(
+                                                lockContext()
+                                                        .orElseThrow(
+                                                                () ->
+                                                                        new 
RuntimeException(
+                                                                               
 "No lock context when lock is enabled."))))
+                        .orElse(null);
+        return new SchemaManager(fileIO, path)
+                .withLock(catalogLock == null ? null : 
Lock.fromCatalog(catalogLock, identifier));
     }
 
     @Override
@@ -156,7 +166,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     @Override
     protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        new SchemaManager(fileIO, 
getDataTableLocation(identifier)).commitChanges(changes);
+        schemaManager(identifier).commitChanges(changes);
     }
 
     private static <T> T uncheck(Callable<T> callable) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 61dc5959c..689a93ee9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -55,7 +56,6 @@ import static 
org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
 import static org.apache.paimon.jdbc.JdbcUtils.execute;
 import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
 import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 
 /* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -76,7 +76,7 @@ public class JdbcCatalog extends AbstractCatalog {
 
     protected JdbcCatalog(
             FileIO fileIO, String catalogKey, Map<String, String> config, 
String warehouse) {
-        super(fileIO);
+        super(fileIO, Options.fromMap(config));
         this.catalogKey = catalogKey;
         this.options = config;
         this.warehouse = warehouse;
@@ -347,15 +347,8 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Optional<CatalogLock.LockFactory> lockFactory() {
-        return lockEnabled()
-                ? Optional.of(JdbcCatalogLock.createFactory(connections, 
catalogKey, options))
-                : Optional.empty();
-    }
-
-    private boolean lockEnabled() {
-        return Boolean.parseBoolean(
-                options.getOrDefault(LOCK_ENABLED.key(), 
LOCK_ENABLED.defaultValue().toString()));
+    public Optional<CatalogLock.LockContext> lockContext() {
+        return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, 
catalogKey, options));
     }
 
     private Lock lock(Identifier identifier) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
index ff438a8c8..5e6059232 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -23,6 +23,10 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+
+import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 
 /** Factory to create {@link JdbcCatalog}. */
 public class JdbcCatalogFactory implements CatalogFactory {
@@ -36,7 +40,13 @@ public class JdbcCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
-        String catalogKey = 
context.options().get(JdbcCatalogOptions.CATALOG_KEY);
+        Options options = context.options();
+        String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
+        if (options.get(LOCK_ENABLED)) {
+            if (!options.getOptional(LOCK_TYPE).isPresent()) {
+                options.set(LOCK_TYPE, 
JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
+            }
+        }
         return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), 
warehouse.toString());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
index 94287cb6e..85f15f7f9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
@@ -86,26 +86,11 @@ public class JdbcCatalogLock implements CatalogLock {
         // Do nothing
     }
 
-    /** Create a jdbc lock factory. */
-    public static LockFactory createFactory(
-            JdbcClientPool connections, String catalogName, Map<String, 
String> conf) {
-        return new JdbcCatalogLockFactory(connections, catalogName, conf);
-    }
-
-    private static class JdbcCatalogLockFactory implements LockFactory {
+    /** Jdbc catalog lock factory. */
+    public static class JdbcCatalogLockFactory implements LockFactory {
 
         private static final long serialVersionUID = 1L;
-        private static final String IDENTIFIER = "jdbc";
-        private final JdbcClientPool connections;
-        private final String catalogName;
-        private final Map<String, String> conf;
-
-        public JdbcCatalogLockFactory(
-                JdbcClientPool connections, String catalogName, Map<String, 
String> conf) {
-            this.connections = connections;
-            this.catalogName = catalogName;
-            this.conf = conf;
-        }
+        public static final String IDENTIFIER = "jdbc";
 
         @Override
         public String identifier() {
@@ -114,8 +99,25 @@ public class JdbcCatalogLock implements CatalogLock {
 
         @Override
         public CatalogLock create(LockContext context) {
+            JdbcLockContext lockContext = (JdbcLockContext) context;
             return new JdbcCatalogLock(
-                    connections, catalogName, checkMaxSleep(conf), 
acquireTimeout(conf));
+                    lockContext.connections,
+                    lockContext.catalogName,
+                    checkMaxSleep(lockContext.conf),
+                    acquireTimeout(lockContext.conf));
+        }
+    }
+
+    static class JdbcLockContext implements LockContext {
+        private final JdbcClientPool connections;
+        private final String catalogName;
+        private final Map<String, String> conf;
+
+        public JdbcLockContext(
+                JdbcClientPool connections, String catalogName, Map<String, 
String> conf) {
+            this.connections = connections;
+            this.catalogName = catalogName;
+            this.conf = conf;
         }
     }
 
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index cc2b3f063..34de4106b 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,3 +15,4 @@
 
 org.apache.paimon.catalog.FileSystemCatalogFactory
 org.apache.paimon.jdbc.JdbcCatalogFactory
+org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 5cc79fc85..d03c64bd8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -55,6 +55,7 @@ public class JdbcCatalogTest extends CatalogTestBase {
         properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
         properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
         properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
+        properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
         properties.putAll(props);
         JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", 
properties, warehouse);
         assertThat(catalog.warehouse()).isEqualTo(warehouse);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 71f93553d..b68d65dd2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLock;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
@@ -35,15 +36,19 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for {@link FlinkCatalog}. */
 public class FileSystemCatalogITCase extends AbstractTestBase {
+    private static final AtomicInteger LOCK_COUNT = new AtomicInteger(0);
 
     private static final String DB_NAME = "default";
 
@@ -113,7 +118,7 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
                                 + "'table-default.opt2'='value2', "
                                 + "'table-default.opt3'='value3', "
                                 + "'fs.allow-hadoop-fallback'='false',"
-                                + "'lock.enabled'='true'"
+                                + "'lock.enabled'='false'"
                                 + ")",
                         path));
         tEnv.useCatalog("fs_with_options");
@@ -146,6 +151,39 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
         assertThat(tableOptions).doesNotContainKey("lock.enabled");
     }
 
+    @Test
+    void testCatalogWithLockForSchema() throws Exception {
+        LOCK_COUNT.set(0);
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "CREATE CATALOG 
fs_with_lock WITH ("
+                                                                + 
"'type'='paimon', "
+                                                                + 
"'warehouse'='%s', "
+                                                                + 
"'lock.enabled'='true'"
+                                                                + ")",
+                                                        path))
+                                        .await())
+                .hasRootCauseMessage("No lock type when lock is enabled.");
+        tEnv.executeSql(
+                        String.format(
+                                "CREATE CATALOG fs_with_lock WITH ("
+                                        + "'type'='paimon', "
+                                        + "'warehouse'='%s', "
+                                        + "'lock.enabled'='true',"
+                                        + "'lock.type'='DUMMY'"
+                                        + ")",
+                                path))
+                .await();
+        tEnv.useCatalog("fs_with_lock");
+        tEnv.executeSql("CREATE TABLE table1 (a STRING, b STRING, c 
STRING)").await();
+        tEnv.executeSql("CREATE TABLE table2 (a STRING, b STRING, c 
STRING)").await();
+        tEnv.executeSql("CREATE TABLE table3 (a STRING, b STRING, c 
STRING)").await();
+        tEnv.executeSql("DROP TABLE table3").await();
+        assertThat(LOCK_COUNT.get()).isEqualTo(3);
+    }
+
     private void innerTestWriteRead() throws Exception {
         tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
         BlockingIterator<Row, Row> iterator =
@@ -163,4 +201,29 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
         }
         return result;
     }
+
+    /** Lock factory for file system catalog. */
+    public static class FileSystemCatalogDummyLockFactory implements 
CatalogLock.LockFactory {
+        private static final String IDENTIFIER = "DUMMY";
+
+        @Override
+        public String identifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public CatalogLock create(CatalogLock.LockContext context) {
+            return new CatalogLock() {
+                @Override
+                public <T> T runWithLock(String database, String table, 
Callable<T> callable)
+                        throws Exception {
+                    LOCK_COUNT.incrementAndGet();
+                    return callable.call();
+                }
+
+                @Override
+                public void close() throws IOException {}
+            };
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 22e88ba48..fcb6fe982 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,7 @@
 org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
 
 # Lineage meta factory
-org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
\ No newline at end of file
+org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
+
+# Catalog lock factory
+org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
\ No newline at end of file
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 024857976..589e92037 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -77,6 +77,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
+import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
 import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
 import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
@@ -84,6 +85,7 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -146,11 +148,6 @@ public class HiveCatalog extends AbstractCatalog {
         this.client = createClient(hiveConf, clientClassName);
     }
 
-    @Override
-    public Optional<CatalogLock.LockFactory> lockFactory() {
-        return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) : 
Optional.empty();
-    }
-
     @Override
     public Optional<CatalogLock.LockContext> lockContext() {
         return Optional.of(
@@ -158,11 +155,6 @@ public class HiveCatalog extends AbstractCatalog {
                         new SerializableHiveConf(hiveConf), clientClassName));
     }
 
-    private boolean lockEnabled() {
-        return Boolean.parseBoolean(
-                hiveConf.get(LOCK_ENABLED.key(), 
LOCK_ENABLED.defaultValue().toString()));
-    }
-
     @Override
     public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier 
identifier) {
         try {
@@ -670,7 +662,8 @@ public class HiveCatalog extends AbstractCatalog {
 
     public static Catalog createHiveCatalog(CatalogContext context) {
         HiveConf hiveConf = createHiveConf(context);
-        String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE);
+        Options options = context.options();
+        String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
         if (warehouseStr == null) {
             warehouseStr =
                     hiveConf.get(METASTOREWAREHOUSE.varname, 
METASTOREWAREHOUSE.defaultStrVal);
@@ -687,11 +680,19 @@ public class HiveCatalog extends AbstractCatalog {
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
+
+        /** Hive catalog only support hive lock. */
+        if (options.getOptional(LOCK_ENABLED).orElse(false)) {
+            Optional<String> lockType = options.getOptional(LOCK_TYPE);
+            if (!lockType.isPresent()) {
+                options.set(LOCK_TYPE, LOCK_IDENTIFIER);
+            }
+        }
         return new HiveCatalog(
                 fileIO,
                 hiveConf,
-                
context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
-                context.options(),
+                options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+                options,
                 warehouse.toUri().toString());
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index 1635d8096..c49cd020c 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -44,6 +44,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Hive {@link CatalogLock}. */
 public class HiveCatalogLock implements CatalogLock {
 
+    static final String LOCK_IDENTIFIER = "hive";
+
     private final IMetaStoreClient client;
     private final long checkMaxSleep;
     private final long acquireTimeout;
@@ -112,17 +114,11 @@ public class HiveCatalogLock implements CatalogLock {
         this.client.close();
     }
 
-    /** Create a hive lock factory. */
-    public static LockFactory createFactory() {
-        return new HiveCatalogLockFactory();
-    }
-
-    private static class HiveCatalogLockFactory implements LockFactory {
+    /** Catalog lock factory for hive. */
+    public static class HiveCatalogLockFactory implements LockFactory {
 
         private static final long serialVersionUID = 1L;
 
-        private static final String IDENTIFIER = "hive";
-
         @Override
         public CatalogLock create(LockContext context) {
             checkArgument(context instanceof HiveLockContext);
@@ -136,7 +132,7 @@ public class HiveCatalogLock implements CatalogLock {
 
         @Override
         public String identifier() {
-            return IDENTIFIER;
+            return LOCK_IDENTIFIER;
         }
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index dcc7e6554..d4af13cc0 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,3 +14,6 @@
 # limitations under the License.
 
 org.apache.paimon.hive.HiveCatalogFactory
+
+# Hive catalog lock factory
+org.apache.paimon.hive.HiveCatalogLock$HiveCatalogLockFactory

Reply via email to