This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fe20acc85 [core] Fix serialize problem when using jdbc catalog with
lock enbale in flink. (#3542)
fe20acc85 is described below
commit fe20acc8528fa58ce76dae13af8ebc24b6533985
Author: Wenchao Wu <[email protected]>
AuthorDate: Thu Jun 20 09:40:14 2024 +0800
[core] Fix serialize problem when using jdbc catalog with lock enbale in
flink. (#3542)
---
.../main/java/org/apache/paimon/jdbc/JdbcCatalog.java | 2 +-
.../apache/paimon/jdbc/JdbcCatalogLockContext.java | 13 ++++++++++---
.../java/org/apache/paimon/jdbc/JdbcCatalogTest.java | 19 +++++++++++++++++++
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 7e7718b5b..45600715b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -350,7 +350,7 @@ public class JdbcCatalog extends AbstractCatalog {
@Override
public Optional<CatalogLockContext> lockContext() {
- return Optional.of(new JdbcCatalogLockContext(connections, catalogKey,
options));
+ return Optional.of(new JdbcCatalogLockContext(catalogKey, options));
}
private Lock lock(Identifier identifier) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
index e56b3474c..b109f271d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
+++
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java
@@ -19,17 +19,17 @@
package org.apache.paimon.jdbc;
import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
/** Jdbc lock context. */
public class JdbcCatalogLockContext implements CatalogLockContext {
- private final JdbcClientPool connections;
+ private transient JdbcClientPool connections;
private final String catalogKey;
private final Options options;
- public JdbcCatalogLockContext(JdbcClientPool connections, String
catalogKey, Options options) {
- this.connections = connections;
+ public JdbcCatalogLockContext(String catalogKey, Options options) {
this.catalogKey = catalogKey;
this.options = options;
}
@@ -40,6 +40,13 @@ public class JdbcCatalogLockContext implements
CatalogLockContext {
}
public JdbcClientPool connections() {
+ if (connections == null) {
+ connections =
+ new JdbcClientPool(
+ options.get(CatalogOptions.CLIENT_POOL_SIZE),
+ options.get(CatalogOptions.URI.key()),
+ options.toMap());
+ }
return connections;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index cc1febeab..f0c84eb2c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -22,12 +22,15 @@ import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@@ -36,6 +39,7 @@ import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
/** Tests for {@link JdbcCatalog}. */
public class JdbcCatalogTest extends CatalogTestBase {
@@ -112,4 +116,19 @@ public class JdbcCatalogTest extends CatalogTestBase {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table name [NEW_TABLE] cannot contain upper case
in the catalog.");
}
+
+ @Test
+ public void testSerializeTable() throws Exception {
+ catalog.createDatabase("test_db", false);
+ catalog.createTable(Identifier.create("test_db", "table"),
DEFAULT_TABLE_SCHEMA, false);
+ Table table = catalog.getTable(new Identifier("test_db", "table"));
+ assertDoesNotThrow(
+ () -> {
+ try (ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new
ObjectOutputStream(baos)) {
+ oos.writeObject(table);
+ oos.flush();
+ }
+ });
+ }
}