This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85843e9bb [core] Support common jdbc catalog lock for filesystem 
catalog. (#3076)
85843e9bb is described below

commit 85843e9bb5d270c9ef465cad46d08659dcc0d038
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Mar 26 20:31:09 2024 +0800

    [core] Support common jdbc catalog lock for filesystem catalog. (#3076)
---
 docs/content/how-to/creating-catalogs.md           | 10 +++
 .../apache/paimon/catalog/FileSystemCatalog.java   | 16 +++-
 .../apache/paimon/catalog/LockContextUtils.java    | 98 ++++++++++++++++++++++
 .../org/apache/paimon/jdbc/JdbcCatalogLock.java    |  3 +-
 .../paimon/catalog/FileSystemCatalogTest.java      | 68 +++++++++++++++
 5 files changed, 193 insertions(+), 2 deletions(-)

diff --git a/docs/content/how-to/creating-catalogs.md 
b/docs/content/how-to/creating-catalogs.md
index 536a6b165..fc2927d41 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -53,6 +53,16 @@ USE CATALOG my_catalog;
 
 You can define any default table options with the prefix `table-default.` for 
tables created in the catalog.
 
+The FileSystem catalog supports jdbc lock and can take effect through the 
following configuration:
+
+> ```shell
+> 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
+> 'jdbc.user' = '...',
+> 'jdbc.password' = '...',
+> ```
+
+
+
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index e71c92dc4..2c1d1e8df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.client.ClientPool;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -35,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
@@ -46,6 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     private final Path warehouse;
 
+    private ClientPool.ClientPoolImpl clientPool;
+
     public FileSystemCatalog(FileIO fileIO, Path warehouse) {
         super(fileIO);
         this.warehouse = warehouse;
@@ -156,6 +160,14 @@ public class FileSystemCatalog extends AbstractCatalog {
                 .withLock(catalogLock == null ? null : 
Lock.fromCatalog(catalogLock, identifier));
     }
 
+    @Override
+    public Optional<CatalogLock.LockContext> lockContext() {
+        if (clientPool == null) {
+            this.clientPool = 
LockContextUtils.tryInitializeClientPool(catalogOptions);
+        }
+        return LockContextUtils.lockContext(this.clientPool, catalogOptions, 
"filesystem");
+    }
+
     @Override
     public void renameTableImpl(Identifier fromTable, Identifier toTable) {
         Path fromPath = getDataTableLocation(fromTable);
@@ -187,7 +199,9 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public void close() throws Exception {}
+    public void close() throws Exception {
+        LockContextUtils.close(clientPool);
+    }
 
     @Override
     public String warehouse() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
new file mode 100644
index 000000000..699c54de4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.client.ClientPool;
+import org.apache.paimon.jdbc.JdbcCatalogFactory;
+import org.apache.paimon.jdbc.JdbcCatalogLock;
+import org.apache.paimon.jdbc.JdbcClientPool;
+import org.apache.paimon.jdbc.JdbcUtils;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Optional;
+
+/** Utils for {@link org.apache.paimon.catalog.CatalogLock.LockContext}. */
+public class LockContextUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemCatalog.class);
+
+    public static Optional<CatalogLock.LockContext> lockContext(
+            ClientPool.ClientPoolImpl clientPool, Options catalogOptions, 
String catalogKey) {
+        if (clientPool == null) {
+            return Optional.of(new 
AbstractCatalog.OptionLockContext(catalogOptions));
+        }
+        String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
+        switch (lockType) {
+            case JdbcCatalogFactory.IDENTIFIER:
+                JdbcClientPool connections = (JdbcClientPool) clientPool;
+                return Optional.of(
+                        new JdbcCatalogLock.JdbcLockContext(
+                                connections, catalogKey, catalogOptions));
+            default:
+                LOG.warn("Unsupported lock type:" + lockType);
+                return Optional.of(new 
AbstractCatalog.OptionLockContext(catalogOptions));
+        }
+    }
+
+    public static ClientPool.ClientPoolImpl tryInitializeClientPool(Options 
catalogOptions) {
+        String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
+        if (lockType == null) {
+            return null;
+        }
+        switch (lockType) {
+            case JdbcCatalogFactory.IDENTIFIER:
+                JdbcClientPool connections =
+                        new JdbcClientPool(
+                                
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
+                                catalogOptions.get(CatalogOptions.URI.key()),
+                                catalogOptions.toMap());
+                try {
+                    JdbcUtils.createDistributedLockTable(connections, 
catalogOptions);
+                } catch (SQLException e) {
+                    throw new RuntimeException("Cannot initialize JDBC 
distributed lock.", e);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Interrupted in call to 
initialize", e);
+                }
+                return connections;
+            default:
+                LOG.warn("Unsupported lock type:" + lockType);
+                return null;
+        }
+    }
+
+    public static void close(ClientPool.ClientPoolImpl clientPool) {
+        if (clientPool == null) {
+            return;
+        }
+        if (clientPool instanceof JdbcClientPool) {
+            JdbcClientPool connections = (JdbcClientPool) clientPool;
+            if (!connections.isClosed()) {
+                connections.close();
+            }
+        } else {
+            clientPool.close();
+        }
+        clientPool = null;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
index d713feb7e..2d409f12a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java
@@ -109,7 +109,8 @@ public class JdbcCatalogLock implements CatalogLock {
         }
     }
 
-    static class JdbcLockContext implements LockContext {
+    /** Jdbc lock context. */
+    public static class JdbcLockContext implements LockContext {
         private final JdbcClientPool connections;
         private final String catalogKey;
         private final Options conf;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
new file mode 100644
index 000000000..0948ab07c
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.jdbc.JdbcCatalog;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileSystemCatalog}. */
+public class FileSystemCatalogTest extends CatalogTestBase {
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        catalog = initCatalog(Maps.newHashMap());
+    }
+
+    private FileSystemCatalog initCatalog(Map<String, String> props) {
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(
+                CatalogOptions.URI.key(),
+                "jdbc:sqlite:file::memory:?ic" + 
UUID.randomUUID().toString().replace("-", ""));
+
+        properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
+        properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
+        properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
+        properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
+        properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
+        properties.putAll(props);
+        FileSystemCatalog catalog =
+                new FileSystemCatalog(fileIO, new Path(warehouse), 
Options.fromMap(properties));
+        return catalog;
+    }
+
+    @Override
+    public void testListDatabasesWhenNoDatabases() {
+        List<String> databases = catalog.listDatabases();
+        assertThat(databases).isEqualTo(new ArrayList<>());
+    }
+}

Reply via email to