This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9d2973728 [core] Support jdbc catalog (#2866)
9d2973728 is described below
commit 9d297372819f8eecc7347d3521b9508d49a54da9
Author: Xiaojian Sun <[email protected]>
AuthorDate: Wed Mar 6 14:07:04 2024 +0800
[core] Support jdbc catalog (#2866)
---
docs/content/how-to/creating-catalogs.md | 40 ++
docs/content/maintenance/configurations.md | 6 +
.../generated/catalog_configuration.html | 8 +-
.../generated/jdbc_catalog_configuration.html | 36 ++
.../java/org/apache/paimon/client/ClientPool.java | 31 ++
.../org/apache/paimon/client/ClientPoolImpl.java | 155 ++++++++
.../org/apache/paimon/options/CatalogOptions.java | 9 +-
paimon-core/pom.xml | 7 +
.../jdbc/AbstractDistributedLockDialect.java | 93 +++++
.../paimon/jdbc/DistributedLockDialectFactory.java | 42 ++
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 431 +++++++++++++++++++++
.../org/apache/paimon/jdbc/JdbcCatalogFactory.java | 42 ++
.../org/apache/paimon/jdbc/JdbcCatalogLock.java | 137 +++++++
.../org/apache/paimon/jdbc/JdbcCatalogOptions.java | 34 ++
.../org/apache/paimon/jdbc/JdbcClientPool.java | 81 ++++
.../paimon/jdbc/JdbcDistributedLockDialect.java | 35 ++
.../java/org/apache/paimon/jdbc/JdbcUtils.java | 428 ++++++++++++++++++++
.../paimon/jdbc/MysqlDistributedLockDialect.java | 73 ++++
.../paimon/jdbc/SqlLiteDistributedLockDialect.java | 73 ++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../org/apache/paimon/jdbc/JdbcCatalogTest.java | 110 ++++++
.../configuration/ConfigOptionsDocGenerator.java | 1 +
22 files changed, 1871 insertions(+), 2 deletions(-)
diff --git a/docs/content/how-to/creating-catalogs.md
b/docs/content/how-to/creating-catalogs.md
index a3de5df31..43d23dc1f 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -30,6 +30,7 @@ Paimon catalogs currently support two types of metastores:
* `filesystem` metastore (default), which stores both metadata and table files
in filesystems.
* `hive` metastore, which additionally stores metadata in Hive metastore.
Users can directly access the tables from Hive.
+* `jdbc` metastore, which additionally stores metadata in relational databases
such as MySQL, Postgres, etc.
See [CatalogOptions]({{< ref "maintenance/configurations#catalogoptions" >}})
for detailed options when creating a catalog.
@@ -175,3 +176,42 @@ Using the table option facilitates the convenient
definition of Hive table param
Parameters prefixed with `hive.` will be automatically defined in the
`TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add
the parameter `table.owner=Jon` to the table properties during the creation
process.
+## Creating a Catalog with JDBC Metastore
+
+By using the Paimon JDBC catalog, changes to the catalog will be directly
stored in relational databases such as SQLite, MySQL, postgres, etc.
+
+Currently, lock configuration is only supported for MySQL and SQLite. If you
are using a different type of database for catalog storage, please do not
configure `lock.enabled`.
+
+{{< tabs "jdbc-metastore-example" >}}
+
+{{< tab "Flink" >}}
+
+Paimon JDBC Catalog in Flink needs to correctly add the corresponding jar
package for connecting to the database. You should first download JDBC
connector bundled jar and add it to classpath. such as MySQL, postgres
+
+| database type | Bundle Name | SQL Client JAR
|
+|:--------------|:---------------------|:---------------------------------------------------------------------------|
+| mysql | mysql-connector-java |
[Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) |
+| postgres | postgresql |
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+
+```sql
+CREATE CATALOG my_jdbc WITH (
+ 'type' = 'paimon',
+ 'metastore' = 'jdbc',
+ 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>',
+ 'jdbc.user' = '...',
+ 'jdbc.password' = '...',
+ 'catalog-key'='jdbc',
+ 'warehouse' = 'hdfs:///path/to/warehouse'
+);
+
+USE CATALOG my_jdbc;
+```
+You can configure any connection parameters that have been declared by JDBC
through "jdbc.", the connection parameters may be different between different
databases, please configure according to the actual situation.
+
+You can also perform logical isolation for databases under multiple catalogs
by specifying "catalog-key".
+
+You can define any default table options with the prefix `table-default.` for
tables created in the catalog.
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/content/maintenance/configurations.md
b/docs/content/maintenance/configurations.md
index f48956462..87d59cd34 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -50,6 +50,12 @@ Options for Hive catalog.
{{< generated/hive_catalog_configuration >}}
+### JdbcCatalogOptions
+
+Options for Jdbc catalog.
+
+{{< generated/jdbc_catalog_configuration >}}
+
### FlinkCatalogOptions
Flink catalog options for paimon.
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index d243737a8..e68555944 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,6 +26,12 @@ under the License.
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>client-pool-size</h5></td>
+ <td style="word-wrap: break-word;">2</td>
+ <td>Integer</td>
+ <td>Configure the size of the connection pool.</td>
+ </tr>
<tr>
<td><h5>fs.allow-hadoop-fallback</h5></td>
<td style="word-wrap: break-word;">true</td>
@@ -60,7 +66,7 @@ under the License.
<td><h5>metastore</h5></td>
<td style="word-wrap: break-word;">"filesystem"</td>
<td>String</td>
- <td>Metastore of paimon catalog, supports filesystem and hive.</td>
+ <td>Metastore of paimon catalog, supports filesystem、hive and
jdbc.</td>
</tr>
<tr>
<td><h5>table.type</h5></td>
diff --git a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html
b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html
new file mode 100644
index 000000000..617018ce9
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html
@@ -0,0 +1,36 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>catalog-key</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Custom jdbc catalog store key.</td>
+ </tr>
+ </tbody>
+</table>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
new file mode 100644
index 000000000..f0e4f0741
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.client;
+
+/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */
+public interface ClientPool<C, E extends Exception> {
+ /** Action interface for client. */
+ interface Action<R, C, E extends Exception> {
+ R run(C client) throws E;
+ }
+
+ <R> R run(Action<R, C, E> action) throws E, InterruptedException;
+
+ <R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException;
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
new file mode 100644
index 000000000..dda336be2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */
+public abstract class ClientPoolImpl<C, E extends Exception>
+ implements Closeable, ClientPool<C, E> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClientPoolImpl.class);
+
+ private final int poolSize;
+ private final Deque<C> clients;
+ private final Class<? extends E> reconnectExc;
+ private final Object signal = new Object();
+ private final boolean retryByDefault;
+ private volatile int currentSize;
+ private boolean closed;
+
+ public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc,
boolean retryByDefault) {
+ this.poolSize = poolSize;
+ this.reconnectExc = reconnectExc;
+ this.clients = new ArrayDeque<>(poolSize);
+ this.currentSize = 0;
+ this.closed = false;
+ this.retryByDefault = retryByDefault;
+ }
+
+ @Override
+ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+ return run(action, retryByDefault);
+ }
+
+ @Override
+ public <R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException {
+ C client = get();
+ try {
+ return action.run(client);
+ } catch (Exception exc) {
+ if (retry && isConnectionException(exc)) {
+ try {
+ client = reconnect(client);
+ } catch (Exception ignored) {
+ // if reconnection throws any exception, rethrow the
original failure
+ throw reconnectExc.cast(exc);
+ }
+
+ return action.run(client);
+ }
+
+ throw exc;
+
+ } finally {
+ release(client);
+ }
+ }
+
+ protected abstract C newClient();
+
+ protected abstract C reconnect(C client);
+
+ protected boolean isConnectionException(Exception exc) {
+ return reconnectExc.isInstance(exc);
+ }
+
+ protected abstract void close(C client);
+
+ @Override
+ public void close() {
+ this.closed = true;
+ try {
+ while (currentSize > 0) {
+ if (!clients.isEmpty()) {
+ synchronized (this) {
+ if (!clients.isEmpty()) {
+ C client = clients.removeFirst();
+ close(client);
+ currentSize -= 1;
+ }
+ }
+ }
+ if (clients.isEmpty() && currentSize > 0) {
+ synchronized (signal) {
+ // wake every second in case this missed the signal
+ signal.wait(1000);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while shutting down pool. Some clients may
not be closed.", e);
+ }
+ }
+
+ private C get() throws InterruptedException {
+ checkState(!closed, "Cannot get a client from a closed pool");
+ while (true) {
+ if (!clients.isEmpty() || currentSize < poolSize) {
+ synchronized (this) {
+ if (!clients.isEmpty()) {
+ return clients.removeFirst();
+ } else if (currentSize < poolSize) {
+ C client = newClient();
+ currentSize += 1;
+ return client;
+ }
+ }
+ }
+ synchronized (signal) {
+ // wake every second in case this missed the signal
+ signal.wait(1000);
+ }
+ }
+ }
+
+ private void release(C client) {
+ synchronized (this) {
+ clients.addFirst(client);
+ }
+ synchronized (signal) {
+ signal.notify();
+ }
+ }
+
+ public int poolSize() {
+ return poolSize;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 0fba499dd..42cd9e418 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -39,7 +39,8 @@ public class CatalogOptions {
ConfigOptions.key("metastore")
.stringType()
.defaultValue("filesystem")
- .withDescription("Metastore of paimon catalog, supports
filesystem and hive.");
+ .withDescription(
+ "Metastore of paimon catalog, supports
filesystem、hive and jdbc.");
public static final ConfigOption<String> URI =
ConfigOptions.key("uri")
@@ -78,6 +79,12 @@ public class CatalogOptions {
.withDescription(
"Allow to fallback to hadoop File IO when no file
io found for the scheme.");
+ public static final ConfigOption<Integer> CLIENT_POOL_SIZE =
+ key("client-pool-size")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Configure the size of the connection
pool.");
+
public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 0829c7da2..2652286b0 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -190,6 +190,13 @@ under the License.
<version>3.6.1</version>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ <version>3.44.0.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java
new file mode 100644
index 000000000..f3469d0b5
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/** Jdbc distributed lock interface. */
+public abstract class AbstractDistributedLockDialect implements
JdbcDistributedLockDialect {
+
+ @Override
+ public void createTable(JdbcClientPool connections) throws SQLException,
InterruptedException {
+ connections.run(
+ conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists =
+ dbMeta.getTables(
+ null, null,
JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME, null);
+ if (tableExists.next()) {
+ return true;
+ }
+ return
conn.prepareStatement(getCreateTableSql()).execute();
+ });
+ }
+
+ public abstract String getCreateTableSql();
+
+ @Override
+ public boolean lockAcquire(JdbcClientPool connections, String lockId, long
timeoutMillSeconds)
+ throws SQLException, InterruptedException {
+ return connections.run(
+ connection -> {
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(getLockAcquireSql())) {
+ preparedStatement.setString(1, lockId);
+ preparedStatement.setLong(2, timeoutMillSeconds /
1000);
+ return preparedStatement.executeUpdate() > 0;
+ } catch (SQLException ex) {
+ return false;
+ }
+ });
+ }
+
+ public abstract String getLockAcquireSql();
+
+ @Override
+ public boolean releaseLock(JdbcClientPool connections, String lockId)
+ throws SQLException, InterruptedException {
+ return connections.run(
+ connection -> {
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(getReleaseLockSql())) {
+ preparedStatement.setString(1, lockId);
+ return preparedStatement.executeUpdate() > 0;
+ }
+ });
+ }
+
+ public abstract String getReleaseLockSql();
+
+ @Override
+ public int tryReleaseTimedOutLock(JdbcClientPool connections, String
lockId)
+ throws SQLException, InterruptedException {
+ return connections.run(
+ connection -> {
+ try (PreparedStatement preparedStatement =
+
connection.prepareStatement(getTryReleaseTimedOutLock())) {
+ preparedStatement.setString(1, lockId);
+ return preparedStatement.executeUpdate();
+ }
+ });
+ }
+
+ public abstract String getTryReleaseTimedOutLock();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
new file mode 100644
index 000000000..197845638
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+class DistributedLockDialectFactory {
+ static JdbcDistributedLockDialect create(String protocol) {
+ JdbcProtocol type = JdbcProtocol.valueOf(protocol.toUpperCase());
+ switch (type) {
+ case SQLITE:
+ return new SqlLiteDistributedLockDialect();
+ case MYSQL:
+ return new MysqlDistributedLockDialect();
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Distributed locks based on %s are not
supported", protocol));
+ }
+ }
+
+ /** Supported jdbc protocol. */
+ enum JdbcProtocol {
+ SQLITE,
+ // for mysql.
+ MARIADB,
+ MYSQL;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
new file mode 100644
index 000000000..5dc2abc9e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
+import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
+import static org.apache.paimon.jdbc.JdbcUtils.execute;
+import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
+import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
+import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+
+/** Support jdbc catalog. */
+public class JdbcCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcCatalog.class);
+ public static final String PROPERTY_PREFIX = "jdbc.";
+ private static final String DATABASE_EXISTS_PROPERTY = "exists";
+ private JdbcClientPool connections;
+ private String catalogKey = "jdbc";
+ private Map<String, String> configuration;
+ private final String warehouse;
+
+ protected JdbcCatalog(
+ FileIO fileIO, String catalogKey, Map<String, String> config,
String warehouse) {
+ super(fileIO);
+ if (!StringUtils.isBlank(catalogKey)) {
+ this.catalogKey = catalogKey;
+ }
+ this.configuration = config;
+ this.warehouse = warehouse;
+ Preconditions.checkNotNull(configuration, "Invalid catalog properties:
null");
+ this.connections =
+ new JdbcClientPool(
+ Integer.parseInt(
+ config.getOrDefault(
+ CatalogOptions.CLIENT_POOL_SIZE.key(),
+
CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())),
+ configuration.get(CatalogOptions.URI.key()),
+ configuration);
+ try {
+ initializeCatalogTablesIfNeed();
+ } catch (SQLException e) {
+ throw new RuntimeException("Cannot initialize JDBC catalog", e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted in call to initialize", e);
+ }
+ }
+
+ @VisibleForTesting
+ public JdbcClientPool getConnections() {
+ return connections;
+ }
+
+ /** Initialize catalog tables. */
+ private void initializeCatalogTablesIfNeed() throws SQLException,
InterruptedException {
+ String uri = configuration.get(CatalogOptions.URI.key());
+ Preconditions.checkNotNull(uri, "JDBC connection URI is required");
+ // Check and create catalog table.
+ connections.run(
+ conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists =
+ dbMeta.getTables(null, null,
JdbcUtils.CATALOG_TABLE_NAME, null);
+ if (tableExists.next()) {
+ return true;
+ }
+ return
conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute();
+ });
+
+ // Check and create database properties table.
+ connections.run(
+ conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists =
+ dbMeta.getTables(
+ null, null,
JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null);
+ if (tableExists.next()) {
+ return true;
+ }
+ return
conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)
+ .execute();
+ });
+
+ // if lock enabled, Check and create distributed lock table.
+ if (lockEnabled()) {
+ JdbcUtils.createDistributedLockTable(connections);
+ }
+ }
+
+ @Override
+ public String warehouse() {
+ return warehouse;
+ }
+
+ @Override
+ public List<String> listDatabases() {
+ List<String> databases = Lists.newArrayList();
+ databases.addAll(
+ fetch(
+ row -> row.getString(JdbcUtils.TABLE_DATABASE),
+ JdbcUtils.LIST_ALL_TABLE_DATABASES_SQL,
+ catalogKey));
+
+ databases.addAll(
+ fetch(
+ row -> row.getString(JdbcUtils.DATABASE_NAME),
+ JdbcUtils.LIST_ALL_PROPERTY_DATABASES_SQL,
+ catalogKey));
+ return databases;
+ }
+
+ @Override
+ protected boolean databaseExistsImpl(String databaseName) {
+ return JdbcUtils.databaseExists(connections, catalogKey, databaseName);
+ }
+
+ @Override
+ protected Map<String, String> loadDatabasePropertiesImpl(String
databaseName) {
+ if (!databaseExists(databaseName)) {
+ throw new RuntimeException(String.format("Database does not exist:
%s", databaseName));
+ }
+ Map<String, String> properties = Maps.newHashMap();
+ properties.putAll(fetchProperties(databaseName));
+ if (!properties.containsKey(DB_LOCATION_PROP)) {
+ properties.put(DB_LOCATION_PROP,
newDatabasePath(databaseName).getName());
+ }
+ properties.remove(DATABASE_EXISTS_PROPERTY);
+ return ImmutableMap.copyOf(properties);
+ }
+
+ @Override
+ protected void createDatabaseImpl(String name, Map<String, String>
properties) {
+ if (databaseExists(name)) {
+ throw new RuntimeException(String.format("Database already exists:
%s", name));
+ }
+
+ Map<String, String> createProps = new HashMap<>();
+ createProps.put(DATABASE_EXISTS_PROPERTY, "true");
+ if (properties != null && !properties.isEmpty()) {
+ createProps.putAll(properties);
+ }
+
+ if (!createProps.containsKey(DB_LOCATION_PROP)) {
+ Path databasePath = newDatabasePath(name);
+ createProps.put(DB_LOCATION_PROP, databasePath.toString());
+ }
+ insertProperties(connections, catalogKey, name, createProps);
+ }
+
+ @Override
+ protected void dropDatabaseImpl(String name) {
+ // Delete table from paimon_tables
+ execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogKey, name);
+ // Delete properties from paimon_database_properties
+ execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL,
catalogKey, name);
+ }
+
+ @Override
+ protected List<String> listTablesImpl(String databaseName) {
+ if (!databaseExists(databaseName)) {
+ throw new RuntimeException(String.format("Database does not exist:
%s", databaseName));
+ }
+ return fetch(
+ row -> row.getString(JdbcUtils.TABLE_NAME),
+ JdbcUtils.LIST_TABLES_SQL,
+ catalogKey,
+ databaseName);
+ }
+
+ @Override
+ protected void dropTableImpl(Identifier identifier) {
+ try {
+ int deletedRecords =
+ execute(
+ connections,
+ JdbcUtils.DROP_TABLE_SQL,
+ catalogKey,
+ identifier.getDatabaseName(),
+ identifier.getObjectName());
+
+ if (deletedRecords == 0) {
+ LOG.info("Skipping drop, table does not exist: {}",
identifier);
+ return;
+ }
+ Path path = getDataTableLocation(identifier);
+ try {
+ if (fileIO.exists(path)) {
+ fileIO.deleteDirectoryQuietly(path);
+ }
+ } catch (Exception ex) {
+ LOG.error("Delete directory[{}] fail for table {}", path,
identifier, ex);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to drop table " +
identifier.getFullName(), e);
+ }
+ }
+
+ @Override
+ protected void createTableImpl(Identifier identifier, Schema schema) {
+ try {
+ // create table file
+ getSchemaManager(identifier).createTable(schema);
+ // Update schema metadata
+ Path path = getDataTableLocation(identifier);
+ int insertRecord =
+ connections.run(
+ conn -> {
+ try (PreparedStatement sql =
+ conn.prepareStatement(
+
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
+ sql.setString(1, catalogKey);
+ sql.setString(2,
identifier.getDatabaseName());
+ sql.setString(3,
identifier.getObjectName());
+ return sql.executeUpdate();
+ }
+ });
+ if (insertRecord == 1) {
+ LOG.debug("Successfully committed to new table: {}",
identifier);
+ } else {
+ try {
+ fileIO.deleteDirectoryQuietly(path);
+ } catch (Exception ee) {
+ LOG.error("Delete directory[{}] fail for table {}", path,
identifier, ee);
+ }
+ throw new RuntimeException(
+ String.format(
+ "Failed to create table %s in catalog %s",
+ identifier.getFullName(), catalogKey));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create table " +
identifier.getFullName(), e);
+ }
+ }
+
+ @Override
+ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
+ try {
+ // update table metadata info
+ updateTable(connections, catalogKey, fromTable, toTable);
+
+ Path fromPath = getDataTableLocation(fromTable);
+ if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+ // Rename the file system's table directory. Maintain
consistency between tables in
+ // the file system and tables in the Hive Metastore.
+ Path toPath = getDataTableLocation(toTable);
+ try {
+ fileIO.rename(fromPath, toPath);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to rename changes of table "
+ + toTable.getFullName()
+ + " to underlying files.",
+ e);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to rename table " +
fromTable.getFullName(), e);
+ }
+ }
+
+ @Override
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ if (!tableExists(identifier)) {
+ throw new RuntimeException(
+ String.format("Table is not exists {}",
identifier.getFullName()));
+ }
+ SchemaManager schemaManager = getSchemaManager(identifier);
+ schemaManager.commitChanges(changes);
+ }
+
+ @Override
+ protected TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ if (!tableExists(identifier)) {
+ throw new TableNotExistException(identifier);
+ }
+ Path tableLocation = getDataTableLocation(identifier);
+ return new SchemaManager(fileIO, tableLocation)
+ .latest()
+ .orElseThrow(
+ () -> new RuntimeException("There is no paimon table
in " + tableLocation));
+ }
+
+ @Override
+ public boolean tableExists(Identifier identifier) {
+ if (isSystemTable(identifier)) {
+ return super.tableExists(identifier);
+ }
+ return JdbcUtils.tableExists(
+ connections, catalogKey, identifier.getDatabaseName(),
identifier.getObjectName());
+ }
+
+ @Override
+ public boolean caseSensitive() {
+ return false;
+ }
+
+ @Override
+ public Optional<CatalogLock.LockFactory> lockFactory() {
+ return lockEnabled()
+ ? Optional.of(JdbcCatalogLock.createFactory(connections,
catalogKey, configuration))
+ : Optional.empty();
+ }
+
+ private boolean lockEnabled() {
+ return Boolean.parseBoolean(
+ configuration.getOrDefault(
+ LOCK_ENABLED.key(),
LOCK_ENABLED.defaultValue().toString()));
+ }
+
+ private Lock lock(Identifier identifier) {
+ if (!lockEnabled()) {
+ return new Lock.EmptyLock();
+ }
+ JdbcCatalogLock lock =
+ new JdbcCatalogLock(
+ connections,
+ catalogKey,
+ checkMaxSleep(configuration),
+ acquireTimeout(configuration));
+ return Lock.fromCatalog(lock, identifier);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!connections.isClosed()) {
+ connections.close();
+ }
+ }
+
+ private SchemaManager getSchemaManager(Identifier identifier) {
+ return new SchemaManager(fileIO, getDataTableLocation(identifier))
+ .withLock(lock(identifier));
+ }
+
+ private Map<String, String> fetchProperties(String databaseName) {
+ if (!databaseExists(databaseName)) {
+ throw new RuntimeException(String.format("Database does not exist:
%s", databaseName));
+ }
+ List<Map.Entry<String, String>> entries =
+ fetch(
+ row ->
+ new AbstractMap.SimpleImmutableEntry<>(
+
row.getString(JdbcUtils.DATABASE_PROPERTY_KEY),
+
row.getString(JdbcUtils.DATABASE_PROPERTY_VALUE)),
+ JdbcUtils.GET_ALL_DATABASE_PROPERTIES_SQL,
+ catalogKey,
+ databaseName);
+ return ImmutableMap.<String, String>builder().putAll(entries).build();
+ }
+
+ @FunctionalInterface
+ interface RowProducer<R> {
+ R apply(ResultSet result) throws SQLException;
+ }
+
+ @SuppressWarnings("checkstyle:NestedTryDepth")
+ private <R> List<R> fetch(RowProducer<R> toRow, String sql, String...
args) {
+ try {
+ return connections.run(
+ conn -> {
+ List<R> result = Lists.newArrayList();
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(sql)) {
+ for (int pos = 0; pos < args.length; pos += 1) {
+ preparedStatement.setString(pos + 1,
args[pos]);
+ }
+ try (ResultSet rs =
preparedStatement.executeQuery()) {
+ while (rs.next()) {
+ result.add(toRow.apply(rs));
+ }
+ }
+ }
+ return result;
+ });
+ } catch (SQLException e) {
+ throw new RuntimeException(String.format("Failed to execute query:
%s", sql), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in SQL query", e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
new file mode 100644
index 000000000..1c791eb5e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+/** Factory to create {@link JdbcCatalog}. */
+public class JdbcCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "jdbc";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
+ String catalogKey =
context.options().get(JdbcCatalogOptions.CATALOG_KEY);
+ return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(),
warehouse.getName());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
new file mode 100644
index 000000000..94287cb6e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.utils.TimeUtils;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
+import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
+
+/** Jdbc catalog lock. */
+public class JdbcCatalogLock implements CatalogLock {
+ private final JdbcClientPool connections;
+ private final long checkMaxSleep;
+ private final long acquireTimeout;
+ private final String catalogName;
+
+ public JdbcCatalogLock(
+ JdbcClientPool connections,
+ String catalogName,
+ long checkMaxSleep,
+ long acquireTimeout) {
+ this.connections = connections;
+ this.checkMaxSleep = checkMaxSleep;
+ this.acquireTimeout = acquireTimeout;
+ this.catalogName = catalogName;
+ }
+
+ @Override
+ public <T> T runWithLock(String database, String table, Callable<T>
callable) throws Exception {
+ String lockUniqueName = String.format("%s.%s.%s", catalogName,
database, table);
+ lock(lockUniqueName);
+ try {
+ return callable.call();
+ } finally {
+ JdbcUtils.release(connections, lockUniqueName);
+ }
+ }
+
+ private void lock(String lockUniqueName) throws SQLException,
InterruptedException {
+ boolean lock = JdbcUtils.acquire(connections, lockUniqueName,
acquireTimeout);
+ long nextSleep = 50;
+ long startRetry = System.currentTimeMillis();
+ while (!lock) {
+ nextSleep *= 2;
+ if (nextSleep > checkMaxSleep) {
+ nextSleep = checkMaxSleep;
+ }
+ Thread.sleep(nextSleep);
+ lock = JdbcUtils.acquire(connections, lockUniqueName,
acquireTimeout);
+ if (System.currentTimeMillis() - startRetry > acquireTimeout) {
+ break;
+ }
+ }
+ long retryDuration = System.currentTimeMillis() - startRetry;
+ if (!lock) {
+ throw new RuntimeException(
+ "Acquire lock failed with time: " +
Duration.ofMillis(retryDuration));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Do nothing
+ }
+
+ /** Create a jdbc lock factory. */
+ public static LockFactory createFactory(
+ JdbcClientPool connections, String catalogName, Map<String,
String> conf) {
+ return new JdbcCatalogLockFactory(connections, catalogName, conf);
+ }
+
+ private static class JdbcCatalogLockFactory implements LockFactory {
+
+ private static final long serialVersionUID = 1L;
+ private static final String IDENTIFIER = "jdbc";
+ private final JdbcClientPool connections;
+ private final String catalogName;
+ private final Map<String, String> conf;
+
+ public JdbcCatalogLockFactory(
+ JdbcClientPool connections, String catalogName, Map<String,
String> conf) {
+ this.connections = connections;
+ this.catalogName = catalogName;
+ this.conf = conf;
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public CatalogLock create(LockContext context) {
+ return new JdbcCatalogLock(
+ connections, catalogName, checkMaxSleep(conf),
acquireTimeout(conf));
+ }
+ }
+
+ public static long checkMaxSleep(Map<String, String> conf) {
+ return TimeUtils.parseDuration(
+ conf.getOrDefault(
+ LOCK_CHECK_MAX_SLEEP.key(),
+
TimeUtils.getStringInMillis(LOCK_CHECK_MAX_SLEEP.defaultValue())))
+ .toMillis();
+ }
+
+ public static long acquireTimeout(Map<String, String> conf) {
+ return TimeUtils.parseDuration(
+ conf.getOrDefault(
+ LOCK_ACQUIRE_TIMEOUT.key(),
+
TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
+ .toMillis();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java
new file mode 100644
index 000000000..dd4afd473
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+
+/** Options for jdbc catalog. */
+public final class JdbcCatalogOptions {
+
+ public static final ConfigOption<String> CATALOG_KEY =
+ ConfigOptions.key("catalog-key")
+ .stringType()
+ .defaultValue(null)
+ .withDescription("Custom jdbc catalog store key.");
+
+ private JdbcCatalogOptions() {}
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
new file mode 100644
index 000000000..e1a4cccf1
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.client.ClientPoolImpl;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Client pool for jdbc. */
+public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+
+ private static final Pattern PROTOCOL_PATTERN =
Pattern.compile("jdbc:([^:]+):(.*)");
+ private final String dbUrl;
+ private final Map<String, String> properties;
+ private final String protocol;
+
+ public JdbcClientPool(int poolSize, String dbUrl, Map<String, String>
props) {
+ super(poolSize, SQLNonTransientConnectionException.class, true);
+ properties = props;
+ this.dbUrl = dbUrl;
+ Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl);
+ if (matcher.matches()) {
+ this.protocol = matcher.group(1);
+ } else {
+ throw new RuntimeException("Invalid Jdbc url: " + dbUrl);
+ }
+ }
+
+ @Override
+ protected Connection newClient() {
+ try {
+ Properties dbProps =
+ JdbcUtils.extractJdbcConfiguration(properties,
JdbcCatalog.PROPERTY_PREFIX);
+ return DriverManager.getConnection(dbUrl, dbProps);
+ } catch (SQLException e) {
+ throw new RuntimeException(String.format("Failed to connect: %s",
dbUrl), e);
+ }
+ }
+
+ @Override
+ protected Connection reconnect(Connection client) {
+ close(client);
+ return newClient();
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ @Override
+ protected void close(Connection client) {
+ try {
+ client.close();
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to close connection", e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java
new file mode 100644
index 000000000..a691aac22
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import java.sql.SQLException;
+
+/** Jdbc distributed lock interface. */
+public interface JdbcDistributedLockDialect {
+ void createTable(JdbcClientPool connections) throws SQLException,
InterruptedException;
+
+ boolean lockAcquire(JdbcClientPool connections, String lockId, long
timeoutMillSeconds)
+ throws SQLException, InterruptedException;
+
+ boolean releaseLock(JdbcClientPool connections, String lockId)
+ throws SQLException, InterruptedException;
+
+ int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId)
+ throws SQLException, InterruptedException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
new file mode 100644
index 000000000..7b9b93a5a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.Identifier;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+/** Util for jdbc catalog. */
+public class JdbcUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
+ public static final String CATALOG_TABLE_NAME = "paimon_tables";
+ public static final String CATALOG_KEY = "catalog_key";
+ public static final String TABLE_DATABASE = "database_name";
+ public static final String TABLE_NAME = "table_name";
+
+ static final String CREATE_CATALOG_TABLE =
+ "CREATE TABLE "
+ + CATALOG_TABLE_NAME
+ + "("
+ + CATALOG_KEY
+ + " VARCHAR(255) NOT NULL,"
+ + TABLE_DATABASE
+ + " VARCHAR(255) NOT NULL,"
+ + TABLE_NAME
+ + " VARCHAR(255) NOT NULL,"
+ + " PRIMARY KEY ("
+ + CATALOG_KEY
+ + ", "
+ + TABLE_DATABASE
+ + ", "
+ + TABLE_NAME
+ + ")"
+ + ")";
+ static final String GET_TABLE_SQL =
+ "SELECT * FROM "
+ + CATALOG_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + TABLE_DATABASE
+ + " = ? AND "
+ + TABLE_NAME
+ + " = ? ";
+ static final String LIST_TABLES_SQL =
+ "SELECT * FROM "
+ + CATALOG_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + TABLE_DATABASE
+ + " = ?";
+
+ static final String DELETE_TABLES_SQL =
+ "DELETE FROM "
+ + CATALOG_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + TABLE_DATABASE
+ + " = ?";
+ static final String RENAME_TABLE_SQL =
+ "UPDATE "
+ + CATALOG_TABLE_NAME
+ + " SET "
+ + TABLE_DATABASE
+ + " = ? , "
+ + TABLE_NAME
+ + " = ? "
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + TABLE_DATABASE
+ + " = ? AND "
+ + TABLE_NAME
+ + " = ? ";
+ static final String DROP_TABLE_SQL =
+ "DELETE FROM "
+ + CATALOG_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + TABLE_DATABASE
+ + " = ? AND "
+ + TABLE_NAME
+ + " = ? ";
+ static final String GET_DATABASE_SQL =
+ "SELECT "
+ + TABLE_DATABASE
+ + " FROM "
+ + CATALOG_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + TABLE_DATABASE
+ + " = ? LIMIT 1";
+
+ static final String LIST_ALL_TABLE_DATABASES_SQL =
+ "SELECT DISTINCT "
+ + TABLE_DATABASE
+ + " FROM "
+ + CATALOG_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ?";
+ static final String DO_COMMIT_CREATE_TABLE_SQL =
+ "INSERT INTO "
+ + CATALOG_TABLE_NAME
+ + " ("
+ + CATALOG_KEY
+ + ", "
+ + TABLE_DATABASE
+ + ", "
+ + TABLE_NAME
+ + ") "
+ + " VALUES (?,?,?)";
+
+ // Catalog database Properties
+ static final String DATABASE_PROPERTIES_TABLE_NAME =
"paimon_database_properties";
+ static final String DATABASE_NAME = "database_name";
+ static final String DATABASE_PROPERTY_KEY = "property_key";
+ static final String DATABASE_PROPERTY_VALUE = "property_value";
+
+ static final String CREATE_DATABASE_PROPERTIES_TABLE =
+ "CREATE TABLE "
+ + DATABASE_PROPERTIES_TABLE_NAME
+ + "("
+ + CATALOG_KEY
+ + " VARCHAR(255) NOT NULL,"
+ + DATABASE_NAME
+ + " VARCHAR(255) NOT NULL,"
+ + DATABASE_PROPERTY_KEY
+ + " VARCHAR(255),"
+ + DATABASE_PROPERTY_VALUE
+ + " VARCHAR(1000),"
+ + "PRIMARY KEY ("
+ + CATALOG_KEY
+ + ", "
+ + DATABASE_NAME
+ + ", "
+ + DATABASE_PROPERTY_KEY
+ + ")"
+ + ")";
+ static final String GET_DATABASE_PROPERTIES_SQL =
+ "SELECT "
+ + DATABASE_NAME
+ + " FROM "
+ + DATABASE_PROPERTIES_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + DATABASE_NAME
+ + " = ? ";
+ static final String INSERT_DATABASE_PROPERTIES_SQL =
+ "INSERT INTO "
+ + DATABASE_PROPERTIES_TABLE_NAME
+ + " ("
+ + CATALOG_KEY
+ + ", "
+ + DATABASE_NAME
+ + ", "
+ + DATABASE_PROPERTY_KEY
+ + ", "
+ + DATABASE_PROPERTY_VALUE
+ + ") VALUES ";
+ static final String INSERT_PROPERTIES_VALUES_BASE = "(?,?,?,?)";
+ static final String GET_ALL_DATABASE_PROPERTIES_SQL =
+ "SELECT * "
+ + " FROM "
+ + DATABASE_PROPERTIES_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + DATABASE_NAME
+ + " = ? ";
+ static final String DELETE_ALL_DATABASE_PROPERTIES_SQL =
+ "DELETE FROM "
+ + DATABASE_PROPERTIES_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ? AND "
+ + DATABASE_NAME
+ + " = ?";
+ static final String LIST_ALL_PROPERTY_DATABASES_SQL =
+ "SELECT DISTINCT "
+ + DATABASE_NAME
+ + " FROM "
+ + DATABASE_PROPERTIES_TABLE_NAME
+ + " WHERE "
+ + CATALOG_KEY
+ + " = ?";
+
+ // Distributed locks table
+ static final String DISTRIBUTED_LOCKS_TABLE_NAME =
"paimon_distributed_locks";
+ static final String LOCK_ID = "lock_id";
+ static final String ACQUIRED_AT = "acquired_at";
+ static final String EXPIRE_TIME = "expire_time_seconds";
+
+ public static Properties extractJdbcConfiguration(
+ Map<String, String> properties, String prefix) {
+ Properties result = new Properties();
+ properties.forEach(
+ (key, value) -> {
+ if (key.startsWith(prefix)) {
+ result.put(key.substring(prefix.length()), value);
+ }
+ });
+ return result;
+ }
+
+ /** Get paimon table metadata. */
+ public static Map<String, String> getTable(
+ JdbcClientPool connections, String storeKey, String databaseName,
String tableName)
+ throws SQLException, InterruptedException {
+ return connections.run(
+ conn -> {
+ Map<String, String> table = Maps.newHashMap();
+
+ try (PreparedStatement sql =
conn.prepareStatement(JdbcUtils.GET_TABLE_SQL)) {
+ sql.setString(1, storeKey);
+ sql.setString(2, databaseName);
+ sql.setString(3, tableName);
+ ResultSet rs = sql.executeQuery();
+ if (rs.next()) {
+ table.put(CATALOG_KEY, rs.getString(CATALOG_KEY));
+ table.put(TABLE_DATABASE,
rs.getString(TABLE_DATABASE));
+ table.put(TABLE_NAME, rs.getString(TABLE_NAME));
+ }
+ rs.close();
+ }
+ return table;
+ });
+ }
+
+ public static void updateTable(
+ JdbcClientPool connections, String storeKey, Identifier fromTable,
Identifier toTable) {
+ int updatedRecords =
+ execute(
+ err -> {
+ if (err instanceof
SQLIntegrityConstraintViolationException
+ || (err.getMessage() != null
+ &&
err.getMessage().contains("constraint failed"))) {
+ throw new RuntimeException(
+ String.format("Table already exists:
%s", toTable));
+ }
+ },
+ connections,
+ JdbcUtils.RENAME_TABLE_SQL,
+ toTable.getDatabaseName(),
+ toTable.getObjectName(),
+ storeKey,
+ fromTable.getDatabaseName(),
+ fromTable.getObjectName());
+
+ if (updatedRecords == 1) {
+ LOG.info("Renamed table from {}, to {}", fromTable, toTable);
+ } else if (updatedRecords == 0) {
+ throw new RuntimeException(String.format("Table does not exist:
%s", fromTable));
+ } else {
+ LOG.warn(
+ "Rename operation affected {} rows: the catalog table's
primary key assumption has been violated",
+ updatedRecords);
+ }
+ }
+
+ public static boolean databaseExists(
+ JdbcClientPool connections, String storeKey, String databaseName) {
+
+ if (exists(connections, JdbcUtils.GET_DATABASE_SQL, storeKey,
databaseName)) {
+ return true;
+ }
+
+ if (exists(connections, JdbcUtils.GET_DATABASE_PROPERTIES_SQL,
storeKey, databaseName)) {
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean tableExists(
+ JdbcClientPool connections, String storeKey, String databaseName,
String tableName) {
+ if (exists(connections, JdbcUtils.GET_TABLE_SQL, storeKey,
databaseName, tableName)) {
+ return true;
+ }
+ return false;
+ }
+
+ @SuppressWarnings("checkstyle:NestedTryDepth")
+ private static boolean exists(JdbcClientPool connections, String sql,
String... args) {
+ try {
+ return connections.run(
+ conn -> {
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(sql)) {
+ for (int index = 0; index < args.length; index++) {
+ preparedStatement.setString(index + 1,
args[index]);
+ }
+ try (ResultSet rs =
preparedStatement.executeQuery()) {
+ if (rs.next()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ });
+ } catch (SQLException e) {
+ throw new RuntimeException(String.format("Failed to execute exists
query: %s", sql), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in SQL query", e);
+ }
+ }
+
+ public static int execute(JdbcClientPool connections, String sql,
String... args) {
+ return execute(err -> {}, connections, sql, args);
+ }
+
+ public static int execute(
+ Consumer<SQLException> sqlErrorHandler,
+ JdbcClientPool connections,
+ String sql,
+ String... args) {
+ try {
+ return connections.run(
+ conn -> {
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(sql)) {
+ for (int pos = 0; pos < args.length; pos++) {
+ preparedStatement.setString(pos + 1,
args[pos]);
+ }
+ return preparedStatement.executeUpdate();
+ }
+ });
+ } catch (SQLException e) {
+ sqlErrorHandler.accept(e);
+ throw new RuntimeException(String.format("Failed to execute: %s",
sql), e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted in SQL command", e);
+ }
+ }
+
+ public static boolean insertProperties(
+ JdbcClientPool connections,
+ String storeKey,
+ String databaseName,
+ Map<String, String> properties) {
+ String[] args =
+ properties.entrySet().stream()
+ .flatMap(
+ entry ->
+ Stream.of(
+ storeKey,
+ databaseName,
+ entry.getKey(),
+ entry.getValue()))
+ .toArray(String[]::new);
+
+ int insertedRecords =
+ execute(connections,
JdbcUtils.insertPropertiesStatement(properties.size()), args);
+ if (insertedRecords == properties.size()) {
+ return true;
+ }
+ throw new IllegalStateException(
+ String.format(
+ "Failed to insert: %d of %d succeeded",
+ insertedRecords, properties.size()));
+ }
+
+ private static String insertPropertiesStatement(int size) {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtils.INSERT_DATABASE_PROPERTIES_SQL);
+ for (int i = 0; i < size; i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtils.INSERT_PROPERTIES_VALUES_BASE);
+ }
+ return sqlStatement.toString();
+ }
+
+ public static void createDistributedLockTable(JdbcClientPool connections)
+ throws SQLException, InterruptedException {
+
DistributedLockDialectFactory.create(connections.getProtocol()).createTable(connections);
+ }
+
+ public static boolean acquire(
+ JdbcClientPool connections, String lockId, long timeoutMillSeconds)
+ throws SQLException, InterruptedException {
+ JdbcDistributedLockDialect distributedLockDialect =
+
DistributedLockDialectFactory.create(connections.getProtocol());
+ // Check and clear expire lock.
+ int affectedRows =
distributedLockDialect.tryReleaseTimedOutLock(connections, lockId);
+ if (affectedRows > 0) {
+ LOG.debug("Successfully cleared " + affectedRows + " lock
records");
+ }
+ return distributedLockDialect.lockAcquire(connections, lockId,
timeoutMillSeconds);
+ }
+
+ public static void release(JdbcClientPool connections, String lockId)
+ throws SQLException, InterruptedException {
+ DistributedLockDialectFactory.create(connections.getProtocol())
+ .releaseLock(connections, lockId);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java
new file mode 100644
index 000000000..206aa8cd7
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+/** Distributed lock implementation based on mysql table. */
+public class MysqlDistributedLockDialect extends
AbstractDistributedLockDialect {
+
+ @Override
+ public String getCreateTableSql() {
+ return "CREATE TABLE "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + "("
+ + JdbcUtils.LOCK_ID
+ + " VARCHAR(1000) NOT NULL,"
+ + JdbcUtils.ACQUIRED_AT
+ + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ + JdbcUtils.EXPIRE_TIME
+ + " BIGINT DEFAULT 0 NOT NULL,"
+ + "PRIMARY KEY ("
+ + JdbcUtils.LOCK_ID
+ + ")"
+ + ")";
+ }
+
+ @Override
+ public String getLockAcquireSql() {
+ return "INSERT INTO "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + " ("
+ + JdbcUtils.LOCK_ID
+ + ","
+ + JdbcUtils.EXPIRE_TIME
+ + ") VALUES (?,?)";
+ }
+
+ @Override
+ public String getReleaseLockSql() {
+ return "DELETE FROM "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + " WHERE "
+ + JdbcUtils.LOCK_ID
+ + " = ?";
+ }
+
+ @Override
+ public String getTryReleaseTimedOutLock() {
+ return "DELETE FROM "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + " WHERE TIMESTAMPDIFF(SECOND, "
+ + JdbcUtils.ACQUIRED_AT
+ + ", NOW()) >"
+ + JdbcUtils.EXPIRE_TIME
+ + " and "
+ + JdbcUtils.LOCK_ID
+ + " = ?";
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java
new file mode 100644
index 000000000..602fdd1d6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+/** Distributed lock implementation based on sqlite table. */
+public class SqlLiteDistributedLockDialect extends
AbstractDistributedLockDialect {
+
+ @Override
+ public String getCreateTableSql() {
+ return "CREATE TABLE "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + "("
+ + JdbcUtils.LOCK_ID
+ + " VARCHAR(1000) NOT NULL,"
+ + JdbcUtils.ACQUIRED_AT
+ + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ + JdbcUtils.EXPIRE_TIME
+ + " BIGINT DEFAULT 0 NOT NULL,"
+ + "PRIMARY KEY ("
+ + JdbcUtils.LOCK_ID
+ + ")"
+ + ")";
+ }
+
+ @Override
+ public String getLockAcquireSql() {
+ return "INSERT INTO "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + " ("
+ + JdbcUtils.LOCK_ID
+ + ","
+ + JdbcUtils.EXPIRE_TIME
+ + ") VALUES (?,?)";
+ }
+
+ @Override
+ public String getReleaseLockSql() {
+ return "DELETE FROM "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + " WHERE "
+ + JdbcUtils.LOCK_ID
+ + " = ?";
+ }
+
+ @Override
+ public String getTryReleaseTimedOutLock() {
+ return "DELETE FROM "
+ + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ + " WHERE strftime('%s', 'now') - strftime('%s', "
+ + JdbcUtils.ACQUIRED_AT
+ + ") > "
+ + JdbcUtils.EXPIRE_TIME
+ + " and "
+ + JdbcUtils.LOCK_ID
+ + " = ?";
+ }
+}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 485ba5531..cc2b3f063 100644
---
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.paimon.catalog.FileSystemCatalogFactory
+org.apache.paimon.jdbc.JdbcCatalogFactory
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
new file mode 100644
index 000000000..a9a225cf1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.apache.paimon.catalog.CatalogTestBase;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.CatalogOptions;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link JdbcCatalog}. */
+public class JdbcCatalogTest extends CatalogTestBase {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ catalog = initCatalog("test-jdbc-catalog", Maps.newHashMap());
+ }
+
+ private JdbcCatalog initCatalog(String storeKey, Map<String, String>
props) {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(
+ CatalogOptions.URI.key(),
+ "jdbc:sqlite:file::memory:?ic" +
UUID.randomUUID().toString().replace("-", ""));
+
+ properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
+ properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
+ properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
+ properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
+ properties.putAll(props);
+ JdbcCatalog jdbcCatalog = new JdbcCatalog(fileIO, storeKey,
properties, warehouse);
+ return jdbcCatalog;
+ }
+
+ @Test
+ public void testAcquireLockFail() throws SQLException,
InterruptedException {
+ String lockId = "jdbc.testDb.testTable";
+ assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(),
lockId, 3000))
+ .isTrue();
+ assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(),
lockId, 3000))
+ .isFalse();
+ }
+
+ @Test
+ public void testCleanTimeoutLockAndAcquireLock() throws SQLException,
InterruptedException {
+ String lockId = "jdbc.testDb.testTable";
+ assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(),
lockId, 1000))
+ .isTrue();
+ Thread.sleep(2000);
+ assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(),
lockId, 1000))
+ .isTrue();
+ }
+
+ @Test
+ @Override
+ public void testListDatabasesWhenNoDatabases() {
+ List<String> databases = catalog.listDatabases();
+ assertThat(databases).isEqualTo(new ArrayList<>());
+ }
+
+ @Test
+ public void testCheckIdentifierUpperCase() throws Exception {
+ catalog.createDatabase("test_db", false);
+ assertThatThrownBy(
+ () ->
+ catalog.createTable(
+ Identifier.create("TEST_DB",
"new_table"),
+ DEFAULT_TABLE_SCHEMA,
+ false))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Database name [TEST_DB] cannot contain upper case
in the catalog.");
+
+ assertThatThrownBy(
+ () ->
+ catalog.createTable(
+ Identifier.create("test_db",
"NEW_TABLE"),
+ DEFAULT_TABLE_SCHEMA,
+ false))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Table name [NEW_TABLE] cannot contain upper case
in the catalog.");
+ }
+}
diff --git
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index bef414915..37d8661d2 100644
---
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -76,6 +76,7 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("paimon-common", "org.apache.paimon"),
new OptionsClassLocation("paimon-core",
"org.apache.paimon.lookup"),
new OptionsClassLocation("paimon-core",
"org.apache.paimon.catalog"),
+ new OptionsClassLocation("paimon-core",
"org.apache.paimon.jdbc"),
new OptionsClassLocation(
"paimon-flink/paimon-flink-common",
"org.apache.paimon.flink"),
new OptionsClassLocation(