This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c1abbac6a [core] Fix license and minor for JdbcCatalog
c1abbac6a is described below
commit c1abbac6abceb3d5fb824f1028ee219a70148219
Author: Jingsong <[email protected]>
AuthorDate: Wed Mar 6 14:43:13 2024 +0800
[core] Fix license and minor for JdbcCatalog
---
LICENSE | 2 +
.../java/org/apache/paimon/client/ClientPool.java | 143 ++++++++++++++++++-
.../org/apache/paimon/client/ClientPoolImpl.java | 155 ---------------------
.../paimon/jdbc/DistributedLockDialectFactory.java | 3 +-
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 39 +++---
.../org/apache/paimon/jdbc/JdbcClientPool.java | 4 +-
6 files changed, 166 insertions(+), 180 deletions(-)
diff --git a/LICENSE b/LICENSE
index 8ce241d3f..97c849402 100644
--- a/LICENSE
+++ b/LICENSE
@@ -237,6 +237,8 @@
paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/net
paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkServer.java
from http://flink.apache.org/ version 1.17.0
+paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
diff --git
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
index f0e4f0741..deddf75a2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
@@ -18,7 +18,20 @@
package org.apache.paimon.client;
-/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Client pool for using multiple clients to execute actions. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
interface Action<R, C, E extends Exception> {
@@ -28,4 +41,132 @@ public interface ClientPool<C, E extends Exception> {
<R> R run(Action<R, C, E> action) throws E, InterruptedException;
<R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException;
+
+ /** Default implementation for {@link ClientPool}. */
+ abstract class ClientPoolImpl<C, E extends Exception> implements
Closeable, ClientPool<C, E> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClientPoolImpl.class);
+
+ private final int poolSize;
+ private final Deque<C> clients;
+ private final Class<? extends E> reconnectExc;
+ private final Object signal = new Object();
+ private final boolean retryByDefault;
+ private volatile int currentSize;
+ private boolean closed;
+
+ public ClientPoolImpl(
+ int poolSize, Class<? extends E> reconnectExc, boolean
retryByDefault) {
+ this.poolSize = poolSize;
+ this.reconnectExc = reconnectExc;
+ this.clients = new ArrayDeque<>(poolSize);
+ this.currentSize = 0;
+ this.closed = false;
+ this.retryByDefault = retryByDefault;
+ }
+
+ @Override
+ public <R> R run(Action<R, C, E> action) throws E,
InterruptedException {
+ return run(action, retryByDefault);
+ }
+
+ @Override
+ public <R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException {
+ C client = get();
+ try {
+ return action.run(client);
+ } catch (Exception exc) {
+ if (retry && isConnectionException(exc)) {
+ try {
+ client = reconnect(client);
+ } catch (Exception ignored) {
+ // if reconnection throws any exception, rethrow the
original failure
+ throw reconnectExc.cast(exc);
+ }
+
+ return action.run(client);
+ }
+
+ throw exc;
+
+ } finally {
+ release(client);
+ }
+ }
+
+ protected abstract C newClient();
+
+ protected abstract C reconnect(C client);
+
+ protected boolean isConnectionException(Exception exc) {
+ return reconnectExc.isInstance(exc);
+ }
+
+ protected abstract void close(C client);
+
+ @Override
+ public void close() {
+ this.closed = true;
+ try {
+ while (currentSize > 0) {
+ if (!clients.isEmpty()) {
+ synchronized (this) {
+ if (!clients.isEmpty()) {
+ C client = clients.removeFirst();
+ close(client);
+ currentSize -= 1;
+ }
+ }
+ }
+ if (clients.isEmpty() && currentSize > 0) {
+ synchronized (signal) {
+ // wake every second in case this missed the signal
+ signal.wait(1000);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn(
+ "Interrupted while shutting down pool. Some clients
may not be closed.", e);
+ }
+ }
+
+ private C get() throws InterruptedException {
+ checkState(!closed, "Cannot get a client from a closed pool");
+ while (true) {
+ if (!clients.isEmpty() || currentSize < poolSize) {
+ synchronized (this) {
+ if (!clients.isEmpty()) {
+ return clients.removeFirst();
+ } else if (currentSize < poolSize) {
+ C client = newClient();
+ currentSize += 1;
+ return client;
+ }
+ }
+ }
+ synchronized (signal) {
+ // wake every second in case this missed the signal
+ signal.wait(1000);
+ }
+ }
+ }
+
+ private void release(C client) {
+ synchronized (this) {
+ clients.addFirst(client);
+ }
+ synchronized (signal) {
+ signal.notify();
+ }
+ }
+
+ public int poolSize() {
+ return poolSize;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
deleted file mode 100644
index dda336be2..000000000
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.client;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import static org.apache.paimon.utils.Preconditions.checkState;
-
-/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */
-public abstract class ClientPoolImpl<C, E extends Exception>
- implements Closeable, ClientPool<C, E> {
- private static final Logger LOG =
LoggerFactory.getLogger(ClientPoolImpl.class);
-
- private final int poolSize;
- private final Deque<C> clients;
- private final Class<? extends E> reconnectExc;
- private final Object signal = new Object();
- private final boolean retryByDefault;
- private volatile int currentSize;
- private boolean closed;
-
- public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc,
boolean retryByDefault) {
- this.poolSize = poolSize;
- this.reconnectExc = reconnectExc;
- this.clients = new ArrayDeque<>(poolSize);
- this.currentSize = 0;
- this.closed = false;
- this.retryByDefault = retryByDefault;
- }
-
- @Override
- public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
- return run(action, retryByDefault);
- }
-
- @Override
- public <R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException {
- C client = get();
- try {
- return action.run(client);
- } catch (Exception exc) {
- if (retry && isConnectionException(exc)) {
- try {
- client = reconnect(client);
- } catch (Exception ignored) {
- // if reconnection throws any exception, rethrow the
original failure
- throw reconnectExc.cast(exc);
- }
-
- return action.run(client);
- }
-
- throw exc;
-
- } finally {
- release(client);
- }
- }
-
- protected abstract C newClient();
-
- protected abstract C reconnect(C client);
-
- protected boolean isConnectionException(Exception exc) {
- return reconnectExc.isInstance(exc);
- }
-
- protected abstract void close(C client);
-
- @Override
- public void close() {
- this.closed = true;
- try {
- while (currentSize > 0) {
- if (!clients.isEmpty()) {
- synchronized (this) {
- if (!clients.isEmpty()) {
- C client = clients.removeFirst();
- close(client);
- currentSize -= 1;
- }
- }
- }
- if (clients.isEmpty() && currentSize > 0) {
- synchronized (signal) {
- // wake every second in case this missed the signal
- signal.wait(1000);
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while shutting down pool. Some clients may
not be closed.", e);
- }
- }
-
- private C get() throws InterruptedException {
- checkState(!closed, "Cannot get a client from a closed pool");
- while (true) {
- if (!clients.isEmpty() || currentSize < poolSize) {
- synchronized (this) {
- if (!clients.isEmpty()) {
- return clients.removeFirst();
- } else if (currentSize < poolSize) {
- C client = newClient();
- currentSize += 1;
- return client;
- }
- }
- }
- synchronized (signal) {
- // wake every second in case this missed the signal
- signal.wait(1000);
- }
- }
- }
-
- private void release(C client) {
- synchronized (this) {
- clients.addFirst(client);
- }
- synchronized (signal) {
- signal.notify();
- }
- }
-
- public int poolSize() {
- return poolSize;
- }
-
- public boolean isClosed() {
- return closed;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
index 197845638..698092e05 100644
---
a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java
@@ -35,8 +35,7 @@ class DistributedLockDialectFactory {
/** Supported jdbc protocol. */
enum JdbcProtocol {
SQLITE,
- // for mysql.
MARIADB,
- MYSQL;
+ MYSQL
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 5dc2abc9e..952b03607 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -58,34 +58,38 @@ import static
org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
/** Support jdbc catalog. */
public class JdbcCatalog extends AbstractCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcCatalog.class);
+
public static final String PROPERTY_PREFIX = "jdbc.";
private static final String DATABASE_EXISTS_PROPERTY = "exists";
- private JdbcClientPool connections;
- private String catalogKey = "jdbc";
- private Map<String, String> configuration;
+
+ private final JdbcClientPool connections;
+ private final String catalogKey;
+ private final Map<String, String> options;
private final String warehouse;
protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config,
String warehouse) {
super(fileIO);
- if (!StringUtils.isBlank(catalogKey)) {
- this.catalogKey = catalogKey;
- }
- this.configuration = config;
+ this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" :
catalogKey;
+ this.options = config;
this.warehouse = warehouse;
- Preconditions.checkNotNull(configuration, "Invalid catalog properties:
null");
+ Preconditions.checkNotNull(options, "Invalid catalog properties:
null");
this.connections =
new JdbcClientPool(
Integer.parseInt(
config.getOrDefault(
CatalogOptions.CLIENT_POOL_SIZE.key(),
CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())),
- configuration.get(CatalogOptions.URI.key()),
- configuration);
+ options.get(CatalogOptions.URI.key()),
+ options);
try {
initializeCatalogTablesIfNeed();
} catch (SQLException e) {
@@ -102,7 +106,7 @@ public class JdbcCatalog extends AbstractCatalog {
/** Initialize catalog tables. */
private void initializeCatalogTablesIfNeed() throws SQLException,
InterruptedException {
- String uri = configuration.get(CatalogOptions.URI.key());
+ String uri = options.get(CatalogOptions.URI.key());
Preconditions.checkNotNull(uri, "JDBC connection URI is required");
// Check and create catalog table.
connections.run(
@@ -311,8 +315,7 @@ public class JdbcCatalog extends AbstractCatalog {
protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
if (!tableExists(identifier)) {
- throw new RuntimeException(
- String.format("Table is not exists {}",
identifier.getFullName()));
+ throw new RuntimeException("Table is not exists " +
identifier.getFullName());
}
SchemaManager schemaManager = getSchemaManager(identifier);
schemaManager.commitChanges(changes);
@@ -347,14 +350,13 @@ public class JdbcCatalog extends AbstractCatalog {
@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
- ? Optional.of(JdbcCatalogLock.createFactory(connections,
catalogKey, configuration))
+ ? Optional.of(JdbcCatalogLock.createFactory(connections,
catalogKey, options))
: Optional.empty();
}
private boolean lockEnabled() {
return Boolean.parseBoolean(
- configuration.getOrDefault(
- LOCK_ENABLED.key(),
LOCK_ENABLED.defaultValue().toString()));
+ options.getOrDefault(LOCK_ENABLED.key(),
LOCK_ENABLED.defaultValue().toString()));
}
private Lock lock(Identifier identifier) {
@@ -363,10 +365,7 @@ public class JdbcCatalog extends AbstractCatalog {
}
JdbcCatalogLock lock =
new JdbcCatalogLock(
- connections,
- catalogKey,
- checkMaxSleep(configuration),
- acquireTimeout(configuration));
+ connections, catalogKey, checkMaxSleep(options),
acquireTimeout(options));
return Lock.fromCatalog(lock, identifier);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
index e1a4cccf1..287af41d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
@@ -18,7 +18,7 @@
package org.apache.paimon.jdbc;
-import org.apache.paimon.client.ClientPoolImpl;
+import org.apache.paimon.client.ClientPool;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -30,7 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** Client pool for jdbc. */
-public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection,
SQLException> {
private static final Pattern PROTOCOL_PATTERN =
Pattern.compile("jdbc:([^:]+):(.*)");
private final String dbUrl;