This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8a0aec65e [core] Support configuring lock in paimon catalog (#2933)
8a0aec65e is described below
commit 8a0aec65ee5c91857fd15d254a34ad0b9daab8f7
Author: Fang Yong <[email protected]>
AuthorDate: Fri Mar 15 16:27:09 2024 +0800
[core] Support configuring lock in paimon catalog (#2933)
* [core] Support configuring lock in paimon catalog
---
.../generated/catalog_configuration.html | 8 ++-
.../org/apache/paimon/options/CatalogOptions.java | 8 ++-
.../org/apache/paimon/catalog/AbstractCatalog.java | 35 ++++++++++++
.../apache/paimon/catalog/FileSystemCatalog.java | 30 ++++++----
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 15 ++---
.../org/apache/paimon/jdbc/JdbcCatalogFactory.java | 12 +++-
.../org/apache/paimon/jdbc/JdbcCatalogLock.java | 40 ++++++-------
.../services/org.apache.paimon.factories.Factory | 1 +
.../org/apache/paimon/jdbc/JdbcCatalogTest.java | 1 +
.../paimon/flink/FileSystemCatalogITCase.java | 65 +++++++++++++++++++++-
.../services/org.apache.paimon.factories.Factory | 5 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 27 ++++-----
.../org/apache/paimon/hive/HiveCatalogLock.java | 14 ++---
.../services/org.apache.paimon.factories.Factory | 3 +
14 files changed, 197 insertions(+), 67 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index e68555944..cab6e731e 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -62,11 +62,17 @@ under the License.
<td>Boolean</td>
<td>Enable Catalog Lock.</td>
</tr>
+ <tr>
+ <td><h5>lock.type</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The Lock Type for Catalog, such as 'hive', 'zookeeper'.</td>
+ </tr>
<tr>
<td><h5>metastore</h5></td>
<td style="word-wrap: break-word;">"filesystem"</td>
<td>String</td>
- <td>Metastore of paimon catalog, supports filesystem、hive and
jdbc.</td>
+ <td>Metastore of paimon catalog, supports filesystem, hive and
jdbc.</td>
</tr>
<tr>
<td><h5>table.type</h5></td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 42cd9e418..f00a35a75 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -40,7 +40,7 @@ public class CatalogOptions {
.stringType()
.defaultValue("filesystem")
.withDescription(
- "Metastore of paimon catalog, supports
filesystem、hive and jdbc.");
+ "Metastore of paimon catalog, supports filesystem,
hive and jdbc.");
public static final ConfigOption<String> URI =
ConfigOptions.key("uri")
@@ -60,6 +60,12 @@ public class CatalogOptions {
.defaultValue(false)
.withDescription("Enable Catalog Lock.");
+ public static final ConfigOption<String> LOCK_TYPE =
+ ConfigOptions.key("lock.type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Lock Type for Catalog, such as
'hive', 'zookeeper'.");
+
public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
.durationType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 0dbbb1f40..c69a72b0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -46,10 +46,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -80,6 +83,30 @@ public abstract class AbstractCatalog implements Catalog {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(),
TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
+
+ if (lockEnabled()) {
+ checkArgument(options.contains(LOCK_TYPE), "No lock type when lock
is enabled.");
+ }
+ }
+
+ @Override
+ public Optional<CatalogLock.LockFactory> lockFactory() {
+ return lockEnabled()
+ ? Optional.of(
+ FactoryUtil.discoverFactory(
+ AbstractCatalog.class.getClassLoader(),
+ CatalogLock.LockFactory.class,
+ catalogOptions.get(LOCK_TYPE)))
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<CatalogLock.LockContext> lockContext() {
+ return Optional.of(new OptionLockContext(catalogOptions));
+ }
+
+ protected boolean lockEnabled() {
+ return catalogOptions.get(LOCK_ENABLED);
}
@Override
@@ -465,4 +492,12 @@ public abstract class AbstractCatalog implements Catalog {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}
+
+ static class OptionLockContext implements CatalogLock.LockContext {
+ private final Options catalogOptions;
+
+ public OptionLockContext(Options catalogOptions) {
+ this.catalogOptions = catalogOptions;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 0c01e9cb7..e71c92dc4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Callable;
import static
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
@@ -56,11 +56,6 @@ public class FileSystemCatalog extends AbstractCatalog {
this.warehouse = warehouse;
}
- @Override
- public Optional<CatalogLock.LockFactory> lockFactory() {
- return Optional.empty();
- }
-
@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
@@ -128,8 +123,7 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
- Path path = getDataTableLocation(identifier);
- return new SchemaManager(fileIO, path)
+ return schemaManager(identifier)
.latest()
.orElseThrow(() -> new TableNotExistException(identifier));
}
@@ -142,8 +136,24 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public void createTableImpl(Identifier identifier, Schema schema) {
+ uncheck(() -> schemaManager(identifier).createTable(schema));
+ }
+
+ private SchemaManager schemaManager(Identifier identifier) {
Path path = getDataTableLocation(identifier);
- uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
+ CatalogLock catalogLock =
+ lockFactory()
+ .map(
+ fac ->
+ fac.create(
+ lockContext()
+ .orElseThrow(
+ () ->
+ new
RuntimeException(
+
"No lock context when lock is enabled."))))
+ .orElse(null);
+ return new SchemaManager(fileIO, path)
+ .withLock(catalogLock == null ? null :
Lock.fromCatalog(catalogLock, identifier));
}
@Override
@@ -156,7 +166,7 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- new SchemaManager(fileIO,
getDataTableLocation(identifier)).commitChanges(changes);
+ schemaManager(identifier).commitChanges(changes);
}
private static <T> T uncheck(Callable<T> callable) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 61dc5959c..689a93ee9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -55,7 +56,6 @@ import static
org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -76,7 +76,7 @@ public class JdbcCatalog extends AbstractCatalog {
protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config,
String warehouse) {
- super(fileIO);
+ super(fileIO, Options.fromMap(config));
this.catalogKey = catalogKey;
this.options = config;
this.warehouse = warehouse;
@@ -347,15 +347,8 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- public Optional<CatalogLock.LockFactory> lockFactory() {
- return lockEnabled()
- ? Optional.of(JdbcCatalogLock.createFactory(connections,
catalogKey, options))
- : Optional.empty();
- }
-
- private boolean lockEnabled() {
- return Boolean.parseBoolean(
- options.getOrDefault(LOCK_ENABLED.key(),
LOCK_ENABLED.defaultValue().toString()));
+ public Optional<CatalogLock.LockContext> lockContext() {
+ return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections,
catalogKey, options));
}
private Lock lock(Identifier identifier) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
index ff438a8c8..5e6059232 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -23,6 +23,10 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+
+import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {
@@ -36,7 +40,13 @@ public class JdbcCatalogFactory implements CatalogFactory {
@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
- String catalogKey =
context.options().get(JdbcCatalogOptions.CATALOG_KEY);
+ Options options = context.options();
+ String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
+ if (options.get(LOCK_ENABLED)) {
+ if (!options.getOptional(LOCK_TYPE).isPresent()) {
+ options.set(LOCK_TYPE,
JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
+ }
+ }
return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(),
warehouse.toString());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
index 94287cb6e..85f15f7f9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
@@ -86,26 +86,11 @@ public class JdbcCatalogLock implements CatalogLock {
// Do nothing
}
- /** Create a jdbc lock factory. */
- public static LockFactory createFactory(
- JdbcClientPool connections, String catalogName, Map<String,
String> conf) {
- return new JdbcCatalogLockFactory(connections, catalogName, conf);
- }
-
- private static class JdbcCatalogLockFactory implements LockFactory {
+ /** Jdbc catalog lock factory. */
+ public static class JdbcCatalogLockFactory implements LockFactory {
private static final long serialVersionUID = 1L;
- private static final String IDENTIFIER = "jdbc";
- private final JdbcClientPool connections;
- private final String catalogName;
- private final Map<String, String> conf;
-
- public JdbcCatalogLockFactory(
- JdbcClientPool connections, String catalogName, Map<String,
String> conf) {
- this.connections = connections;
- this.catalogName = catalogName;
- this.conf = conf;
- }
+ public static final String IDENTIFIER = "jdbc";
@Override
public String identifier() {
@@ -114,8 +99,25 @@ public class JdbcCatalogLock implements CatalogLock {
@Override
public CatalogLock create(LockContext context) {
+ JdbcLockContext lockContext = (JdbcLockContext) context;
return new JdbcCatalogLock(
- connections, catalogName, checkMaxSleep(conf),
acquireTimeout(conf));
+ lockContext.connections,
+ lockContext.catalogName,
+ checkMaxSleep(lockContext.conf),
+ acquireTimeout(lockContext.conf));
+ }
+ }
+
+ static class JdbcLockContext implements LockContext {
+ private final JdbcClientPool connections;
+ private final String catalogName;
+ private final Map<String, String> conf;
+
+ public JdbcLockContext(
+ JdbcClientPool connections, String catalogName, Map<String,
String> conf) {
+ this.connections = connections;
+ this.catalogName = catalogName;
+ this.conf = conf;
}
}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index cc2b3f063..34de4106b 100644
---
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,3 +15,4 @@
org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
+org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 5cc79fc85..d03c64bd8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -55,6 +55,7 @@ public class JdbcCatalogTest extends CatalogTestBase {
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
+ properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog",
properties, warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 71f93553d..b68d65dd2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
@@ -35,15 +36,19 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for {@link FlinkCatalog}. */
public class FileSystemCatalogITCase extends AbstractTestBase {
+ private static final AtomicInteger LOCK_COUNT = new AtomicInteger(0);
private static final String DB_NAME = "default";
@@ -113,7 +118,7 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
+ "'table-default.opt2'='value2', "
+ "'table-default.opt3'='value3', "
+ "'fs.allow-hadoop-fallback'='false',"
- + "'lock.enabled'='true'"
+ + "'lock.enabled'='false'"
+ ")",
path));
tEnv.useCatalog("fs_with_options");
@@ -146,6 +151,39 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
assertThat(tableOptions).doesNotContainKey("lock.enabled");
}
+ @Test
+ void testCatalogWithLockForSchema() throws Exception {
+ LOCK_COUNT.set(0);
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG
fs_with_lock WITH ("
+ +
"'type'='paimon', "
+ +
"'warehouse'='%s', "
+ +
"'lock.enabled'='true'"
+ + ")",
+ path))
+ .await())
+ .hasRootCauseMessage("No lock type when lock is enabled.");
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG fs_with_lock WITH ("
+ + "'type'='paimon', "
+ + "'warehouse'='%s', "
+ + "'lock.enabled'='true',"
+ + "'lock.type'='DUMMY'"
+ + ")",
+ path))
+ .await();
+ tEnv.useCatalog("fs_with_lock");
+ tEnv.executeSql("CREATE TABLE table1 (a STRING, b STRING, c
STRING)").await();
+ tEnv.executeSql("CREATE TABLE table2 (a STRING, b STRING, c
STRING)").await();
+ tEnv.executeSql("CREATE TABLE table3 (a STRING, b STRING, c
STRING)").await();
+ tEnv.executeSql("DROP TABLE table3").await();
+ assertThat(LOCK_COUNT.get()).isEqualTo(3);
+ }
+
private void innerTestWriteRead() throws Exception {
tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
BlockingIterator<Row, Row> iterator =
@@ -163,4 +201,29 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
}
return result;
}
+
+ /** Lock factory for file system catalog. */
+ public static class FileSystemCatalogDummyLockFactory implements
CatalogLock.LockFactory {
+ private static final String IDENTIFIER = "DUMMY";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public CatalogLock create(CatalogLock.LockContext context) {
+ return new CatalogLock() {
+ @Override
+ public <T> T runWithLock(String database, String table,
Callable<T> callable)
+ throws Exception {
+ LOCK_COUNT.incrementAndGet();
+ return callable.call();
+ }
+
+ @Override
+ public void close() throws IOException {}
+ };
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 22e88ba48..fcb6fe982 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,7 @@
org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
# Lineage meta factory
-org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
\ No newline at end of file
+org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
+
+# Catalog lock factory
+org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
\ No newline at end of file
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 024857976..589e92037 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -77,6 +77,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
+import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
@@ -84,6 +85,7 @@ import static
org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -146,11 +148,6 @@ public class HiveCatalog extends AbstractCatalog {
this.client = createClient(hiveConf, clientClassName);
}
- @Override
- public Optional<CatalogLock.LockFactory> lockFactory() {
- return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) :
Optional.empty();
- }
-
@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(
@@ -158,11 +155,6 @@ public class HiveCatalog extends AbstractCatalog {
new SerializableHiveConf(hiveConf), clientClassName));
}
- private boolean lockEnabled() {
- return Boolean.parseBoolean(
- hiveConf.get(LOCK_ENABLED.key(),
LOCK_ENABLED.defaultValue().toString()));
- }
-
@Override
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
try {
@@ -670,7 +662,8 @@ public class HiveCatalog extends AbstractCatalog {
public static Catalog createHiveCatalog(CatalogContext context) {
HiveConf hiveConf = createHiveConf(context);
- String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE);
+ Options options = context.options();
+ String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
if (warehouseStr == null) {
warehouseStr =
hiveConf.get(METASTOREWAREHOUSE.varname,
METASTOREWAREHOUSE.defaultStrVal);
@@ -687,11 +680,19 @@ public class HiveCatalog extends AbstractCatalog {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
+
+ /** Hive catalog only support hive lock. */
+ if (options.getOptional(LOCK_ENABLED).orElse(false)) {
+ Optional<String> lockType = options.getOptional(LOCK_TYPE);
+ if (!lockType.isPresent()) {
+ options.set(LOCK_TYPE, LOCK_IDENTIFIER);
+ }
+ }
return new HiveCatalog(
fileIO,
hiveConf,
-
context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
- context.options(),
+ options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+ options,
warehouse.toUri().toString());
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index 1635d8096..c49cd020c 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -44,6 +44,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Hive {@link CatalogLock}. */
public class HiveCatalogLock implements CatalogLock {
+ static final String LOCK_IDENTIFIER = "hive";
+
private final IMetaStoreClient client;
private final long checkMaxSleep;
private final long acquireTimeout;
@@ -112,17 +114,11 @@ public class HiveCatalogLock implements CatalogLock {
this.client.close();
}
- /** Create a hive lock factory. */
- public static LockFactory createFactory() {
- return new HiveCatalogLockFactory();
- }
-
- private static class HiveCatalogLockFactory implements LockFactory {
+ /** Catalog lock factory for hive. */
+ public static class HiveCatalogLockFactory implements LockFactory {
private static final long serialVersionUID = 1L;
- private static final String IDENTIFIER = "hive";
-
@Override
public CatalogLock create(LockContext context) {
checkArgument(context instanceof HiveLockContext);
@@ -136,7 +132,7 @@ public class HiveCatalogLock implements CatalogLock {
@Override
public String identifier() {
- return IDENTIFIER;
+ return LOCK_IDENTIFIER;
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index dcc7e6554..d4af13cc0 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,3 +14,6 @@
# limitations under the License.
org.apache.paimon.hive.HiveCatalogFactory
+
+# Hive catalog lock factory
+org.apache.paimon.hive.HiveCatalogLock$HiveCatalogLockFactory