This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c1abbac6a [core] Fix license and minor for JdbcCatalog
c1abbac6a is described below

commit c1abbac6abceb3d5fb824f1028ee219a70148219
Author: Jingsong <[email protected]>
AuthorDate: Wed Mar 6 14:43:13 2024 +0800

    [core] Fix license and minor for JdbcCatalog
---
 LICENSE                                            |   2 +
 .../java/org/apache/paimon/client/ClientPool.java  | 143 ++++++++++++++++++-
 .../org/apache/paimon/client/ClientPoolImpl.java   | 155 ---------------------
 .../paimon/jdbc/DistributedLockDialectFactory.java |   3 +-
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   |  39 +++---
 .../org/apache/paimon/jdbc/JdbcClientPool.java     |   4 +-
 6 files changed, 166 insertions(+), 180 deletions(-)

diff --git a/LICENSE b/LICENSE
index 8ce241d3f..97c849402 100644
--- a/LICENSE
+++ b/LICENSE
@@ -237,6 +237,8 @@ 
paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/net
 
paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkServer.java
 from http://flink.apache.org/ version 1.17.0
 
+paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
 paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
 paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
 
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java 
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
index f0e4f0741..deddf75a2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
@@ -18,7 +18,20 @@
 
 package org.apache.paimon.client;
 
-/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Client pool for using multiple clients to execute actions. */
 public interface ClientPool<C, E extends Exception> {
     /** Action interface for client. */
     interface Action<R, C, E extends Exception> {
@@ -28,4 +41,132 @@ public interface ClientPool<C, E extends Exception> {
     <R> R run(Action<R, C, E> action) throws E, InterruptedException;
 
     <R> R run(Action<R, C, E> action, boolean retry) throws E, 
InterruptedException;
+
+    /** Default implementation for {@link ClientPool}. */
+    abstract class ClientPoolImpl<C, E extends Exception> implements 
Closeable, ClientPool<C, E> {
+        private static final Logger LOG = 
LoggerFactory.getLogger(ClientPoolImpl.class);
+
+        private final int poolSize;
+        private final Deque<C> clients;
+        private final Class<? extends E> reconnectExc;
+        private final Object signal = new Object();
+        private final boolean retryByDefault;
+        private volatile int currentSize;
+        private boolean closed;
+
+        public ClientPoolImpl(
+                int poolSize, Class<? extends E> reconnectExc, boolean 
retryByDefault) {
+            this.poolSize = poolSize;
+            this.reconnectExc = reconnectExc;
+            this.clients = new ArrayDeque<>(poolSize);
+            this.currentSize = 0;
+            this.closed = false;
+            this.retryByDefault = retryByDefault;
+        }
+
+        @Override
+        public <R> R run(Action<R, C, E> action) throws E, 
InterruptedException {
+            return run(action, retryByDefault);
+        }
+
+        @Override
+        public <R> R run(Action<R, C, E> action, boolean retry) throws E, 
InterruptedException {
+            C client = get();
+            try {
+                return action.run(client);
+            } catch (Exception exc) {
+                if (retry && isConnectionException(exc)) {
+                    try {
+                        client = reconnect(client);
+                    } catch (Exception ignored) {
+                        // if reconnection throws any exception, rethrow the 
original failure
+                        throw reconnectExc.cast(exc);
+                    }
+
+                    return action.run(client);
+                }
+
+                throw exc;
+
+            } finally {
+                release(client);
+            }
+        }
+
+        protected abstract C newClient();
+
+        protected abstract C reconnect(C client);
+
+        protected boolean isConnectionException(Exception exc) {
+            return reconnectExc.isInstance(exc);
+        }
+
+        protected abstract void close(C client);
+
+        @Override
+        public void close() {
+            this.closed = true;
+            try {
+                while (currentSize > 0) {
+                    if (!clients.isEmpty()) {
+                        synchronized (this) {
+                            if (!clients.isEmpty()) {
+                                C client = clients.removeFirst();
+                                close(client);
+                                currentSize -= 1;
+                            }
+                        }
+                    }
+                    if (clients.isEmpty() && currentSize > 0) {
+                        synchronized (signal) {
+                            // wake every second in case this missed the signal
+                            signal.wait(1000);
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOG.warn(
+                        "Interrupted while shutting down pool. Some clients 
may not be closed.", e);
+            }
+        }
+
+        private C get() throws InterruptedException {
+            checkState(!closed, "Cannot get a client from a closed pool");
+            while (true) {
+                if (!clients.isEmpty() || currentSize < poolSize) {
+                    synchronized (this) {
+                        if (!clients.isEmpty()) {
+                            return clients.removeFirst();
+                        } else if (currentSize < poolSize) {
+                            C client = newClient();
+                            currentSize += 1;
+                            return client;
+                        }
+                    }
+                }
+                synchronized (signal) {
+                    // wake every second in case this missed the signal
+                    signal.wait(1000);
+                }
+            }
+        }
+
+        private void release(C client) {
+            synchronized (this) {
+                clients.addFirst(client);
+            }
+            synchronized (signal) {
+                signal.notify();
+            }
+        }
+
+        public int poolSize() {
+            return poolSize;
+        }
+
+        public boolean isClosed() {
+            return closed;
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java 
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
deleted file mode 100644
index dda336be2..000000000
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import static org.apache.paimon.utils.Preconditions.checkState;
-
-/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */
-public abstract class ClientPoolImpl<C, E extends Exception>
-        implements Closeable, ClientPool<C, E> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ClientPoolImpl.class);
-
-    private final int poolSize;
-    private final Deque<C> clients;
-    private final Class<? extends E> reconnectExc;
-    private final Object signal = new Object();
-    private final boolean retryByDefault;
-    private volatile int currentSize;
-    private boolean closed;
-
-    public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, 
boolean retryByDefault) {
-        this.poolSize = poolSize;
-        this.reconnectExc = reconnectExc;
-        this.clients = new ArrayDeque<>(poolSize);
-        this.currentSize = 0;
-        this.closed = false;
-        this.retryByDefault = retryByDefault;
-    }
-
-    @Override
-    public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
-        return run(action, retryByDefault);
-    }
-
-    @Override
-    public <R> R run(Action<R, C, E> action, boolean retry) throws E, 
InterruptedException {
-        C client = get();
-        try {
-            return action.run(client);
-        } catch (Exception exc) {
-            if (retry && isConnectionException(exc)) {
-                try {
-                    client = reconnect(client);
-                } catch (Exception ignored) {
-                    // if reconnection throws any exception, rethrow the 
original failure
-                    throw reconnectExc.cast(exc);
-                }
-
-                return action.run(client);
-            }
-
-            throw exc;
-
-        } finally {
-            release(client);
-        }
-    }
-
-    protected abstract C newClient();
-
-    protected abstract C reconnect(C client);
-
-    protected boolean isConnectionException(Exception exc) {
-        return reconnectExc.isInstance(exc);
-    }
-
-    protected abstract void close(C client);
-
-    @Override
-    public void close() {
-        this.closed = true;
-        try {
-            while (currentSize > 0) {
-                if (!clients.isEmpty()) {
-                    synchronized (this) {
-                        if (!clients.isEmpty()) {
-                            C client = clients.removeFirst();
-                            close(client);
-                            currentSize -= 1;
-                        }
-                    }
-                }
-                if (clients.isEmpty() && currentSize > 0) {
-                    synchronized (signal) {
-                        // wake every second in case this missed the signal
-                        signal.wait(1000);
-                    }
-                }
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted while shutting down pool. Some clients may 
not be closed.", e);
-        }
-    }
-
-    private C get() throws InterruptedException {
-        checkState(!closed, "Cannot get a client from a closed pool");
-        while (true) {
-            if (!clients.isEmpty() || currentSize < poolSize) {
-                synchronized (this) {
-                    if (!clients.isEmpty()) {
-                        return clients.removeFirst();
-                    } else if (currentSize < poolSize) {
-                        C client = newClient();
-                        currentSize += 1;
-                        return client;
-                    }
-                }
-            }
-            synchronized (signal) {
-                // wake every second in case this missed the signal
-                signal.wait(1000);
-            }
-        }
-    }
-
-    private void release(C client) {
-        synchronized (this) {
-            clients.addFirst(client);
-        }
-        synchronized (signal) {
-            signal.notify();
-        }
-    }
-
-    public int poolSize() {
-        return poolSize;
-    }
-
-    public boolean isClosed() {
-        return closed;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
index 197845638..698092e05 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
@@ -35,8 +35,7 @@ class DistributedLockDialectFactory {
     /** Supported jdbc protocol. */
     enum JdbcProtocol {
         SQLITE,
-        // for mysql.
         MARIADB,
-        MYSQL;
+        MYSQL
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 5dc2abc9e..952b03607 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -58,34 +58,38 @@ import static 
org.apache.paimon.jdbc.JdbcUtils.insertProperties;
 import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 
+/* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
 /** Support jdbc catalog. */
 public class JdbcCatalog extends AbstractCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcCatalog.class);
+
     public static final String PROPERTY_PREFIX = "jdbc.";
     private static final String DATABASE_EXISTS_PROPERTY = "exists";
-    private JdbcClientPool connections;
-    private String catalogKey = "jdbc";
-    private Map<String, String> configuration;
+
+    private final JdbcClientPool connections;
+    private final String catalogKey;
+    private final Map<String, String> options;
     private final String warehouse;
 
     protected JdbcCatalog(
             FileIO fileIO, String catalogKey, Map<String, String> config, 
String warehouse) {
         super(fileIO);
-        if (!StringUtils.isBlank(catalogKey)) {
-            this.catalogKey = catalogKey;
-        }
-        this.configuration = config;
+        this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" : 
catalogKey;
+        this.options = config;
         this.warehouse = warehouse;
-        Preconditions.checkNotNull(configuration, "Invalid catalog properties: 
null");
+        Preconditions.checkNotNull(options, "Invalid catalog properties: 
null");
         this.connections =
                 new JdbcClientPool(
                         Integer.parseInt(
                                 config.getOrDefault(
                                         CatalogOptions.CLIENT_POOL_SIZE.key(),
                                         
CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())),
-                        configuration.get(CatalogOptions.URI.key()),
-                        configuration);
+                        options.get(CatalogOptions.URI.key()),
+                        options);
         try {
             initializeCatalogTablesIfNeed();
         } catch (SQLException e) {
@@ -102,7 +106,7 @@ public class JdbcCatalog extends AbstractCatalog {
 
     /** Initialize catalog tables. */
     private void initializeCatalogTablesIfNeed() throws SQLException, 
InterruptedException {
-        String uri = configuration.get(CatalogOptions.URI.key());
+        String uri = options.get(CatalogOptions.URI.key());
         Preconditions.checkNotNull(uri, "JDBC connection URI is required");
         // Check and create catalog table.
         connections.run(
@@ -311,8 +315,7 @@ public class JdbcCatalog extends AbstractCatalog {
     protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         if (!tableExists(identifier)) {
-            throw new RuntimeException(
-                    String.format("Table is not exists {}", 
identifier.getFullName()));
+            throw new RuntimeException("Table is not exists " + 
identifier.getFullName());
         }
         SchemaManager schemaManager = getSchemaManager(identifier);
         schemaManager.commitChanges(changes);
@@ -347,14 +350,13 @@ public class JdbcCatalog extends AbstractCatalog {
     @Override
     public Optional<CatalogLock.LockFactory> lockFactory() {
         return lockEnabled()
-                ? Optional.of(JdbcCatalogLock.createFactory(connections, 
catalogKey, configuration))
+                ? Optional.of(JdbcCatalogLock.createFactory(connections, 
catalogKey, options))
                 : Optional.empty();
     }
 
     private boolean lockEnabled() {
         return Boolean.parseBoolean(
-                configuration.getOrDefault(
-                        LOCK_ENABLED.key(), 
LOCK_ENABLED.defaultValue().toString()));
+                options.getOrDefault(LOCK_ENABLED.key(), 
LOCK_ENABLED.defaultValue().toString()));
     }
 
     private Lock lock(Identifier identifier) {
@@ -363,10 +365,7 @@ public class JdbcCatalog extends AbstractCatalog {
         }
         JdbcCatalogLock lock =
                 new JdbcCatalogLock(
-                        connections,
-                        catalogKey,
-                        checkMaxSleep(configuration),
-                        acquireTimeout(configuration));
+                        connections, catalogKey, checkMaxSleep(options), 
acquireTimeout(options));
         return Lock.fromCatalog(lock, identifier);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
index e1a4cccf1..287af41d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.jdbc;
 
-import org.apache.paimon.client.ClientPoolImpl;
+import org.apache.paimon.client.ClientPool;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -30,7 +30,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /** Client pool for jdbc. */
-public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection, 
SQLException> {
 
     private static final Pattern PROTOCOL_PATTERN = 
Pattern.compile("jdbc:([^:]+):(.*)");
     private final String dbUrl;

Reply via email to