This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cfeb22ff3 [catalog] Refactor Catalog Factory and revert jdbc lock in
FileSystemCatalog (#3099)
cfeb22ff3 is described below
commit cfeb22ff3b985a78a99fa71ceecc3fe808ef9440
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 27 13:07:31 2024 +0800
[catalog] Refactor Catalog Factory and revert jdbc lock in
FileSystemCatalog (#3099)
---
docs/content/how-to/creating-catalogs.md | 10 ---
.../org/apache/paimon/catalog/AbstractCatalog.java | 39 ++++-----
.../java/org/apache/paimon/catalog/Catalog.java | 4 +-
.../org/apache/paimon/catalog/CatalogLock.java | 10 ---
.../{CatalogLock.java => CatalogLockContext.java} | 25 ++----
.../{CatalogLock.java => CatalogLockFactory.java} | 23 +----
.../apache/paimon/catalog/FileSystemCatalog.java | 18 +---
.../apache/paimon/catalog/LockContextUtils.java | 98 ----------------------
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 12 ++-
.../org/apache/paimon/jdbc/JdbcCatalogFactory.java | 8 --
.../org/apache/paimon/jdbc/JdbcCatalogLock.java | 36 --------
.../apache/paimon/jdbc/JdbcCatalogLockContext.java | 49 +++++++++++
.../apache/paimon/jdbc/JdbcCatalogLockFactory.java | 52 ++++++++++++
.../java/org/apache/paimon/operation/Lock.java | 22 ++---
.../services/org.apache.paimon.factories.Factory | 2 +-
.../paimon/catalog/FileSystemCatalogTest.java | 68 ---------------
.../paimon/flink/FileSystemCatalogITCase.java | 19 ++---
.../java/org/apache/paimon/hive/HiveCatalog.java | 32 +++----
.../org/apache/paimon/hive/HiveCatalogLock.java | 33 --------
.../apache/paimon/hive/HiveCatalogLockContext.java | 50 +++++++++++
.../apache/paimon/hive/HiveCatalogLockFactory.java | 52 ++++++++++++
.../services/org.apache.paimon.factories.Factory | 4 +-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 6 +-
23 files changed, 278 insertions(+), 394 deletions(-)
diff --git a/docs/content/how-to/creating-catalogs.md
b/docs/content/how-to/creating-catalogs.md
index fc2927d41..536a6b165 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -53,16 +53,6 @@ USE CATALOG my_catalog;
You can define any default table options with the prefix `table-default.` for
tables created in the catalog.
-The FileSystem catalog supports jdbc lock and can take effect through the
following configuration:
-
-> ```shell
-> 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
-> 'jdbc.user' = '...',
-> 'jdbc.password' = '...',
-> ```
-
-
-
{{< /tab >}}
{{< tab "Spark3" >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c69a72b0d..4a4fb04fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -83,26 +83,31 @@ public abstract class AbstractCatalog implements Catalog {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(),
TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
+ }
- if (lockEnabled()) {
- checkArgument(options.contains(LOCK_TYPE), "No lock type when lock
is enabled.");
+ @Override
+ public Optional<CatalogLockFactory> lockFactory() {
+ if (!lockEnabled()) {
+ return Optional.empty();
}
+
+ String lock = catalogOptions.get(LOCK_TYPE);
+ if (lock == null) {
+ return defaultLockFactory();
+ }
+
+ return Optional.of(
+ FactoryUtil.discoverFactory(
+ AbstractCatalog.class.getClassLoader(),
CatalogLockFactory.class, lock));
}
- @Override
- public Optional<CatalogLock.LockFactory> lockFactory() {
- return lockEnabled()
- ? Optional.of(
- FactoryUtil.discoverFactory(
- AbstractCatalog.class.getClassLoader(),
- CatalogLock.LockFactory.class,
- catalogOptions.get(LOCK_TYPE)))
- : Optional.empty();
+ public Optional<CatalogLockFactory> defaultLockFactory() {
+ return Optional.empty();
}
@Override
- public Optional<CatalogLock.LockContext> lockContext() {
- return Optional.of(new OptionLockContext(catalogOptions));
+ public Optional<CatalogLockContext> lockContext() {
+ return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
}
protected boolean lockEnabled() {
@@ -492,12 +497,4 @@ public abstract class AbstractCatalog implements Catalog {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}
-
- static class OptionLockContext implements CatalogLock.LockContext {
- private final Options catalogOptions;
-
- public OptionLockContext(Options catalogOptions) {
- this.catalogOptions = catalogOptions;
- }
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 0168891b0..99b71e8de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -49,10 +49,10 @@ public interface Catalog extends AutoCloseable {
* Get lock factory from catalog. Lock is used to support multiple
concurrent writes on the
* object store.
*/
- Optional<CatalogLock.LockFactory> lockFactory();
+ Optional<CatalogLockFactory> lockFactory();
/** Get lock context for lock factory to create a lock. */
- default Optional<CatalogLock.LockContext> lockContext() {
+ default Optional<CatalogLockContext> lockContext() {
return Optional.empty();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
index 0e547037e..4f147c6aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
@@ -19,10 +19,8 @@
package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.factories.Factory;
import java.io.Closeable;
-import java.io.Serializable;
import java.util.concurrent.Callable;
/**
@@ -35,12 +33,4 @@ public interface CatalogLock extends Closeable {
/** Run with catalog lock. The caller should tell catalog the database and
table name. */
<T> T runWithLock(String database, String table, Callable<T> callable)
throws Exception;
-
- /** Factory to create {@link CatalogLock}. */
- interface LockFactory extends Factory, Serializable {
- CatalogLock create(LockContext context);
- }
-
- /** Context for lock factory to create lock. */
- interface LockContext extends Serializable {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java
similarity index 53%
copy from paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
copy to
paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java
index 0e547037e..442409c81 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java
@@ -18,29 +18,16 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.annotation.Public;
-import org.apache.paimon.factories.Factory;
+import org.apache.paimon.options.Options;
-import java.io.Closeable;
import java.io.Serializable;
-import java.util.concurrent.Callable;
-/**
- * An interface that allows source and sink to use global lock to some
transaction-related things.
- *
- * @since 0.4.0
- */
-@Public
-public interface CatalogLock extends Closeable {
+/** Context for lock factory to create lock. */
+public interface CatalogLockContext extends Serializable {
- /** Run with catalog lock. The caller should tell catalog the database and
table name. */
- <T> T runWithLock(String database, String table, Callable<T> callable)
throws Exception;
+ Options options();
- /** Factory to create {@link CatalogLock}. */
- interface LockFactory extends Factory, Serializable {
- CatalogLock create(LockContext context);
+ static CatalogLockContext fromOptions(Options options) {
+ return () -> options;
}
-
- /** Context for lock factory to create lock. */
- interface LockContext extends Serializable {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java
similarity index 55%
copy from paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
copy to
paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java
index 0e547037e..d964ebda4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java
@@ -18,29 +18,12 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.annotation.Public;
import org.apache.paimon.factories.Factory;
-import java.io.Closeable;
import java.io.Serializable;
-import java.util.concurrent.Callable;
-/**
- * An interface that allows source and sink to use global lock to some
transaction-related things.
- *
- * @since 0.4.0
- */
-@Public
-public interface CatalogLock extends Closeable {
-
- /** Run with catalog lock. The caller should tell catalog the database and
table name. */
- <T> T runWithLock(String database, String table, Callable<T> callable)
throws Exception;
-
- /** Factory to create {@link CatalogLock}. */
- interface LockFactory extends Factory, Serializable {
- CatalogLock create(LockContext context);
- }
+/** Factory to create {@link CatalogLock}. */
+public interface CatalogLockFactory extends Factory, Serializable {
- /** Context for lock factory to create lock. */
- interface LockContext extends Serializable {}
+ CatalogLock createLock(CatalogLockContext context);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 2c1d1e8df..8ffe0f271 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,7 +18,6 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
@@ -36,7 +35,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Callable;
import static
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
@@ -48,8 +46,6 @@ public class FileSystemCatalog extends AbstractCatalog {
private final Path warehouse;
- private ClientPool.ClientPoolImpl clientPool;
-
public FileSystemCatalog(FileIO fileIO, Path warehouse) {
super(fileIO);
this.warehouse = warehouse;
@@ -149,7 +145,7 @@ public class FileSystemCatalog extends AbstractCatalog {
lockFactory()
.map(
fac ->
- fac.create(
+ fac.createLock(
lockContext()
.orElseThrow(
() ->
@@ -160,14 +156,6 @@ public class FileSystemCatalog extends AbstractCatalog {
.withLock(catalogLock == null ? null :
Lock.fromCatalog(catalogLock, identifier));
}
- @Override
- public Optional<CatalogLock.LockContext> lockContext() {
- if (clientPool == null) {
- this.clientPool =
LockContextUtils.tryInitializeClientPool(catalogOptions);
- }
- return LockContextUtils.lockContext(this.clientPool, catalogOptions,
"filesystem");
- }
-
@Override
public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
@@ -199,9 +187,7 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public void close() throws Exception {
- LockContextUtils.close(clientPool);
- }
+ public void close() throws Exception {}
@Override
public String warehouse() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
deleted file mode 100644
index 699c54de4..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.catalog;
-
-import org.apache.paimon.client.ClientPool;
-import org.apache.paimon.jdbc.JdbcCatalogFactory;
-import org.apache.paimon.jdbc.JdbcCatalogLock;
-import org.apache.paimon.jdbc.JdbcClientPool;
-import org.apache.paimon.jdbc.JdbcUtils;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.SQLException;
-import java.util.Optional;
-
-/** Utils for {@link org.apache.paimon.catalog.CatalogLock.LockContext}. */
-public class LockContextUtils {
-
- private static final Logger LOG =
LoggerFactory.getLogger(FileSystemCatalog.class);
-
- public static Optional<CatalogLock.LockContext> lockContext(
- ClientPool.ClientPoolImpl clientPool, Options catalogOptions,
String catalogKey) {
- if (clientPool == null) {
- return Optional.of(new
AbstractCatalog.OptionLockContext(catalogOptions));
- }
- String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
- switch (lockType) {
- case JdbcCatalogFactory.IDENTIFIER:
- JdbcClientPool connections = (JdbcClientPool) clientPool;
- return Optional.of(
- new JdbcCatalogLock.JdbcLockContext(
- connections, catalogKey, catalogOptions));
- default:
- LOG.warn("Unsupported lock type:" + lockType);
- return Optional.of(new
AbstractCatalog.OptionLockContext(catalogOptions));
- }
- }
-
- public static ClientPool.ClientPoolImpl tryInitializeClientPool(Options
catalogOptions) {
- String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
- if (lockType == null) {
- return null;
- }
- switch (lockType) {
- case JdbcCatalogFactory.IDENTIFIER:
- JdbcClientPool connections =
- new JdbcClientPool(
-
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
- catalogOptions.get(CatalogOptions.URI.key()),
- catalogOptions.toMap());
- try {
- JdbcUtils.createDistributedLockTable(connections,
catalogOptions);
- } catch (SQLException e) {
- throw new RuntimeException("Cannot initialize JDBC
distributed lock.", e);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted in call to
initialize", e);
- }
- return connections;
- default:
- LOG.warn("Unsupported lock type:" + lockType);
- return null;
- }
- }
-
- public static void close(ClientPool.ClientPoolImpl clientPool) {
- if (clientPool == null) {
- return;
- }
- if (clientPool instanceof JdbcClientPool) {
- JdbcClientPool connections = (JdbcClientPool) clientPool;
- if (!connections.isClosed()) {
- connections.close();
- }
- } else {
- clientPool.close();
- }
- clientPool = null;
- }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 91c11ac24..7e7718b5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -20,7 +20,8 @@ package org.apache.paimon.jdbc;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -343,8 +344,13 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- public Optional<CatalogLock.LockContext> lockContext() {
- return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections,
catalogKey, options));
+ public Optional<CatalogLockFactory> defaultLockFactory() {
+ return Optional.of(new JdbcCatalogLockFactory());
+ }
+
+ @Override
+ public Optional<CatalogLockContext> lockContext() {
+ return Optional.of(new JdbcCatalogLockContext(connections, catalogKey,
options));
}
private Lock lock(Identifier identifier) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
index adaaf3f43..6c3c1d0e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -25,9 +25,6 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
-import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-
/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {
@@ -42,11 +39,6 @@ public class JdbcCatalogFactory implements CatalogFactory {
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
- if (options.get(LOCK_ENABLED)) {
- if (!options.getOptional(LOCK_TYPE).isPresent()) {
- options.set(LOCK_TYPE,
JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
- }
- }
return new JdbcCatalog(fileIO, catalogKey, context.options(),
warehouse.toString());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
index 2d409f12a..307f92f0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
@@ -19,7 +19,6 @@
package org.apache.paimon.jdbc;
import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.options.Options;
import org.apache.paimon.utils.TimeUtils;
import java.io.IOException;
@@ -87,41 +86,6 @@ public class JdbcCatalogLock implements CatalogLock {
// Do nothing
}
- /** Jdbc catalog lock factory. */
- public static class JdbcCatalogLockFactory implements LockFactory {
-
- private static final long serialVersionUID = 1L;
- public static final String IDENTIFIER = "jdbc";
-
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
- @Override
- public CatalogLock create(LockContext context) {
- JdbcLockContext lockContext = (JdbcLockContext) context;
- return new JdbcCatalogLock(
- lockContext.connections,
- lockContext.catalogKey,
- checkMaxSleep(lockContext.conf.toMap()),
- acquireTimeout(lockContext.conf.toMap()));
- }
- }
-
- /** Jdbc lock context. */
- public static class JdbcLockContext implements LockContext {
- private final JdbcClientPool connections;
- private final String catalogKey;
- private final Options conf;
-
- public JdbcLockContext(JdbcClientPool connections, String catalogKey,
Options conf) {
- this.connections = connections;
- this.catalogKey = catalogKey;
- this.conf = conf;
- }
- }
-
public static long checkMaxSleep(Map<String, String> conf) {
return TimeUtils.parseDuration(
conf.getOrDefault(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
new file mode 100644
index 000000000..e56b3474c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.options.Options;
+
+/** Jdbc lock context. */
+public class JdbcCatalogLockContext implements CatalogLockContext {
+
+ private final JdbcClientPool connections;
+ private final String catalogKey;
+ private final Options options;
+
+ public JdbcCatalogLockContext(JdbcClientPool connections, String
catalogKey, Options options) {
+ this.connections = connections;
+ this.catalogKey = catalogKey;
+ this.options = options;
+ }
+
+ @Override
+ public Options options() {
+ return options;
+ }
+
+ public JdbcClientPool connections() {
+ return connections;
+ }
+
+ public String catalogKey() {
+ return catalogKey;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java
new file mode 100644
index 000000000..ce0a2d24e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
+
+import java.util.Map;
+
+import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
+import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
+
+/** Jdbc catalog lock factory. */
+public class JdbcCatalogLockFactory implements CatalogLockFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String IDENTIFIER = "jdbc";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public CatalogLock createLock(CatalogLockContext context) {
+ JdbcCatalogLockContext lockContext = (JdbcCatalogLockContext) context;
+ Map<String, String> optionsMap = lockContext.options().toMap();
+ return new JdbcCatalogLock(
+ lockContext.connections(),
+ lockContext.catalogKey(),
+ checkMaxSleep(optionsMap),
+ acquireTimeout(optionsMap));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
index a9f27e70a..76cd8b178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
@@ -20,6 +20,8 @@ package org.apache.paimon.operation;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import javax.annotation.Nullable;
@@ -44,12 +46,12 @@ public interface Lock extends AutoCloseable {
}
static Factory factory(
- @Nullable CatalogLock.LockFactory lockFactory,
- @Nullable CatalogLock.LockContext lockContext,
+ @Nullable CatalogLockFactory lockFactory,
+ @Nullable CatalogLockContext lockContext,
Identifier tablePath) {
return lockFactory == null
? new EmptyFactory()
- : new CatalogLockFactory(lockFactory, lockContext, tablePath);
+ : new LockFactory(lockFactory, lockContext, tablePath);
}
static Factory emptyFactory() {
@@ -57,17 +59,17 @@ public interface Lock extends AutoCloseable {
}
/** A {@link Factory} creating lock from catalog. */
- class CatalogLockFactory implements Factory {
+ class LockFactory implements Factory {
private static final long serialVersionUID = 1L;
- private final CatalogLock.LockFactory lockFactory;
- private final CatalogLock.LockContext lockContext;
+ private final CatalogLockFactory lockFactory;
+ private final CatalogLockContext lockContext;
private final Identifier tablePath;
- public CatalogLockFactory(
- CatalogLock.LockFactory lockFactory,
- CatalogLock.LockContext lockContext,
+ public LockFactory(
+ CatalogLockFactory lockFactory,
+ CatalogLockContext lockContext,
Identifier tablePath) {
this.lockFactory = lockFactory;
this.lockContext = lockContext;
@@ -76,7 +78,7 @@ public interface Lock extends AutoCloseable {
@Override
public Lock create() {
- return fromCatalog(lockFactory.create(lockContext), tablePath);
+ return fromCatalog(lockFactory.createLock(lockContext), tablePath);
}
}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 34de4106b..0f87c96b0 100644
---
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,4 +15,4 @@
org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
-org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
+org.apache.paimon.jdbc.JdbcCatalogLockFactory
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
deleted file mode 100644
index 0948ab07c..000000000
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.catalog;
-
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.jdbc.JdbcCatalog;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
-
-import org.junit.jupiter.api.BeforeEach;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link FileSystemCatalog}. */
-public class FileSystemCatalogTest extends CatalogTestBase {
-
- @BeforeEach
- public void setUp() throws Exception {
- super.setUp();
- catalog = initCatalog(Maps.newHashMap());
- }
-
- private FileSystemCatalog initCatalog(Map<String, String> props) {
- Map<String, String> properties = Maps.newHashMap();
- properties.put(
- CatalogOptions.URI.key(),
- "jdbc:sqlite:file::memory:?ic" +
UUID.randomUUID().toString().replace("-", ""));
-
- properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
- properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
- properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
- properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
- properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
- properties.putAll(props);
- FileSystemCatalog catalog =
- new FileSystemCatalog(fileIO, new Path(warehouse),
Options.fromMap(properties));
- return catalog;
- }
-
- @Override
- public void testListDatabasesWhenNoDatabases() {
- List<String> databases = catalog.listDatabases();
- assertThat(databases).isEqualTo(new ArrayList<>());
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index b68d65dd2..50d28cd11 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
@@ -154,18 +156,6 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
@Test
void testCatalogWithLockForSchema() throws Exception {
LOCK_COUNT.set(0);
- assertThatThrownBy(
- () ->
- tEnv.executeSql(
- String.format(
- "CREATE CATALOG
fs_with_lock WITH ("
- +
"'type'='paimon', "
- +
"'warehouse'='%s', "
- +
"'lock.enabled'='true'"
- + ")",
- path))
- .await())
- .hasRootCauseMessage("No lock type when lock is enabled.");
tEnv.executeSql(
String.format(
"CREATE CATALOG fs_with_lock WITH ("
@@ -203,7 +193,8 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
}
/** Lock factory for file system catalog. */
- public static class FileSystemCatalogDummyLockFactory implements
CatalogLock.LockFactory {
+ public static class FileSystemCatalogDummyLockFactory implements
CatalogLockFactory {
+
private static final String IDENTIFIER = "DUMMY";
@Override
@@ -212,7 +203,7 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
}
@Override
- public CatalogLock create(CatalogLock.LockContext context) {
+ public CatalogLock createLock(CatalogLockContext context) {
return new CatalogLock() {
@Override
public <T> T runWithLock(String database, String table,
Callable<T> callable)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 589e92037..372bfedef 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -23,7 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -77,15 +78,12 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
-import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
-import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -93,6 +91,7 @@ import static
org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
+
private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalog.class);
// Reserved properties
@@ -149,10 +148,15 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public Optional<CatalogLock.LockContext> lockContext() {
+ public Optional<CatalogLockFactory> defaultLockFactory() {
+ return Optional.of(new HiveCatalogLockFactory());
+ }
+
+ @Override
+ public Optional<CatalogLockContext> lockContext() {
return Optional.of(
- new HiveCatalogLock.HiveLockContext(
- new SerializableHiveConf(hiveConf), clientClassName));
+ new HiveCatalogLockContext(
+ new SerializableHiveConf(hiveConf), clientClassName,
catalogOptions));
}
@Override
@@ -635,7 +639,7 @@ public class HiveCatalog extends AbstractCatalog {
try (InputStream inputStream =
hiveSite.getFileSystem(hadoopConf).open(hiveSite)) {
hiveConf.addResource(inputStream, hiveSite.toString());
// trigger a read from the conf to avoid input stream is closed
- isEmbeddedMetastore(hiveConf);
+ hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
} catch (IOException e) {
throw new RuntimeException(
"Failed to load hive-site.xml from specified path:" +
hiveSite, e);
@@ -656,10 +660,6 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
- return
isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
- }
-
public static Catalog createHiveCatalog(CatalogContext context) {
HiveConf hiveConf = createHiveConf(context);
Options options = context.options();
@@ -680,14 +680,6 @@ public class HiveCatalog extends AbstractCatalog {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
-
- /** Hive catalog only support hive lock. */
- if (options.getOptional(LOCK_ENABLED).orElse(false)) {
- Optional<String> lockType = options.getOptional(LOCK_TYPE);
- if (!lockType.isPresent()) {
- options.set(LOCK_TYPE, LOCK_IDENTIFIER);
- }
- }
return new HiveCatalog(
fileIO,
hiveConf,
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index c49cd020c..8c3d3829e 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -39,7 +39,6 @@ import java.util.concurrent.Callable;
import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Hive {@link CatalogLock}. */
public class HiveCatalogLock implements CatalogLock {
@@ -114,28 +113,6 @@ public class HiveCatalogLock implements CatalogLock {
this.client.close();
}
- /** Catalog lock factory for hive. */
- public static class HiveCatalogLockFactory implements LockFactory {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public CatalogLock create(LockContext context) {
- checkArgument(context instanceof HiveLockContext);
- HiveLockContext hiveLockContext = (HiveLockContext) context;
- HiveConf conf = hiveLockContext.hiveConf.conf();
- return new HiveCatalogLock(
- HiveCatalog.createClient(conf,
hiveLockContext.clientClassName),
- checkMaxSleep(conf),
- acquireTimeout(conf));
- }
-
- @Override
- public String identifier() {
- return LOCK_IDENTIFIER;
- }
- }
-
public static long checkMaxSleep(HiveConf conf) {
return TimeUtils.parseDuration(
conf.get(
@@ -151,14 +128,4 @@ public class HiveCatalogLock implements CatalogLock {
TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
.toMillis();
}
-
- static class HiveLockContext implements LockContext {
- private final SerializableHiveConf hiveConf;
- private final String clientClassName;
-
- public HiveLockContext(SerializableHiveConf hiveConf, String
clientClassName) {
- this.hiveConf = hiveConf;
- this.clientClassName = clientClassName;
- }
- }
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java
new file mode 100644
index 000000000..ecffd7f1e
--- /dev/null
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.options.Options;
+
+/** Hive {@link CatalogLockContext}. */
+public class HiveCatalogLockContext implements CatalogLockContext {
+
+ private final SerializableHiveConf hiveConf;
+ private final String clientClassName;
+ private final Options options;
+
+ public HiveCatalogLockContext(
+ SerializableHiveConf hiveConf, String clientClassName, Options
options) {
+ this.hiveConf = hiveConf;
+ this.clientClassName = clientClassName;
+ this.options = options;
+ }
+
+ @Override
+ public Options options() {
+ return options;
+ }
+
+ public SerializableHiveConf hiveConf() {
+ return hiveConf;
+ }
+
+ public String clientClassName() {
+ return clientClassName;
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
new file mode 100644
index 000000000..7c05ce3ee
--- /dev/null
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
+import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
+import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Catalog lock factory for hive. */
+public class HiveCatalogLockFactory implements CatalogLockFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public CatalogLock createLock(CatalogLockContext context) {
+ checkArgument(context instanceof HiveCatalogLockContext);
+ HiveCatalogLockContext hiveLockContext = (HiveCatalogLockContext)
context;
+ HiveConf conf = hiveLockContext.hiveConf().conf();
+ return new HiveCatalogLock(
+ HiveCatalog.createClient(conf,
hiveLockContext.clientClassName()),
+ checkMaxSleep(conf),
+ acquireTimeout(conf));
+ }
+
+ @Override
+ public String identifier() {
+ return LOCK_IDENTIFIER;
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d4af13cc0..baab92184 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,6 +14,4 @@
# limitations under the License.
org.apache.paimon.hive.HiveCatalogFactory
-
-# Hive catalog lock factory
-org.apache.paimon.hive.HiveCatalogLock$HiveCatalogLockFactory
+org.apache.paimon.hive.HiveCatalogLockFactory
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 668c88f1f..4d9753bab 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.hive;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.hive.annotation.Minio;
@@ -695,7 +696,7 @@ public abstract class HiveCatalogITCaseBase {
tEnv.executeSql("CREATE TABLE t (a INT)");
Catalog catalog =
((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
- CatalogLock.LockFactory lockFactory = catalog.lockFactory().get();
+ CatalogLockFactory lockFactory = catalog.lockFactory().get();
AtomicInteger count = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
@@ -710,7 +711,8 @@ public abstract class HiveCatalogITCaseBase {
Thread thread =
new Thread(
() -> {
- CatalogLock lock =
lockFactory.create(catalog.lockContext().get());
+ CatalogLock lock =
+
lockFactory.createLock(catalog.lockContext().get());
for (int j = 0; j < 10; j++) {
try {
lock.runWithLock("test_db", "t",
unsafeIncrement);