This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 97ccc58d95 [core]: Supporting repair procedure for Jdbc catalogs
(#7289)
97ccc58d95 is described below
commit 97ccc58d9504474d6f9527f6ea90745b8d7385c3
Author: junmuz <[email protected]>
AuthorDate: Thu Feb 26 09:35:26 2026 +0000
[core]: Supporting repair procedure for Jdbc catalogs (#7289)
---
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 100 ++++++++--
.../java/org/apache/paimon/jdbc/JdbcUtils.java | 21 +++
.../org/apache/paimon/jdbc/JdbcCatalogTest.java | 201 +++++++++++++++++++++
3 files changed, 309 insertions(+), 13 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 0891bad2bd..c37917d0e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -53,6 +53,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractMap;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,6 +62,9 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties;
@@ -112,6 +116,10 @@ public class JdbcCatalog extends AbstractCatalog {
return connections;
}
+ public String getCatalogKey() {
+ return catalogKey;
+ }
+
/** Initialize catalog tables. */
private void initializeCatalogTablesIfNeed() throws SQLException,
InterruptedException {
String uri = options.get(CatalogOptions.URI.key());
@@ -301,19 +309,11 @@ public class JdbcCatalog extends AbstractCatalog {
runWithLock(identifier, () -> schemaManager.createTable(schema));
// Update schema metadata
Path path = getTableLocation(identifier);
- int insertRecord =
- connections.run(
- conn -> {
- try (PreparedStatement sql =
- conn.prepareStatement(
-
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
- sql.setString(1, catalogKey);
- sql.setString(2,
identifier.getDatabaseName());
- sql.setString(3,
identifier.getTableName());
- return sql.executeUpdate();
- }
- });
- if (insertRecord == 1) {
+ if (JdbcUtils.insertTable(
+ connections,
+ catalogKey,
+ identifier.getDatabaseName(),
+ identifier.getTableName())) {
LOG.debug("Successfully committed to new table: {}",
identifier);
} else {
try {
@@ -415,6 +415,80 @@ public class JdbcCatalog extends AbstractCatalog {
return Lock.fromCatalog(lock, identifier).runWithLock(callable);
}
+ @Override
+ public void repairCatalog() {
+ List<String> databases;
+ try {
+ databases = listDatabasesInFileSystem(new Path(warehouse));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list databases in file
system", e);
+ }
+ for (String database : databases) {
+ repairDatabase(database);
+ }
+ }
+
+ @Override
+ public void repairDatabase(String databaseName) {
+ checkNotSystemDatabase(databaseName);
+
+ // First check if database exists in file system
+ Path databasePath = newDatabasePath(databaseName);
+ List<String> tables;
+ try {
+ if (!fileIO.exists(databasePath)) {
+ throw new RuntimeException("Database directory does not exist:
" + databasePath);
+ }
+ tables = listTablesInFileSystem(databasePath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
+ createDatabaseImpl(databaseName, Collections.emptyMap());
+ }
+
+ // Repair tables
+ for (String table : tables) {
+ try {
+ repairTable(Identifier.create(databaseName, table));
+ } catch (TableNotExistException ignore) {
+ // Table might not exist due to concurrent operations
+ }
+ }
+ }
+
+ @Override
+ public void repairTable(Identifier identifier) throws
TableNotExistException {
+ checkNotBranch(identifier, "repairTable");
+ checkNotSystemTable(identifier, "repairTable");
+
+ // First check if table exists in file system
+ Path tableLocation = getTableLocation(identifier);
+ TableSchema tableSchema =
+ tableSchemaInFileSystem(tableLocation,
identifier.getBranchNameOrDefault())
+ .orElseThrow(() -> new
TableNotExistException(identifier));
+
+ if (!JdbcUtils.databaseExists(connections, catalogKey,
identifier.getDatabaseName())) {
+ createDatabaseImpl(identifier.getDatabaseName(),
Collections.emptyMap());
+ }
+ // Table exists in file system, now check if it exists in JDBC catalog
+ if (!JdbcUtils.tableExists(
+ connections, catalogKey, identifier.getDatabaseName(),
identifier.getTableName())) {
+ // Table missing from JDBC catalog, repair it
+ if (JdbcUtils.insertTable(
+ connections,
+ catalogKey,
+ identifier.getDatabaseName(),
+ identifier.getTableName())) {
+ LOG.debug("Successfully repaired table: {}", identifier);
+ } else {
+ LOG.error("Failed to repair table: {}", identifier);
+ }
+ }
+ // If table exists in both file system and JDBC catalog, nothing to
repair
+ }
+
@Override
public void close() throws Exception {
connections.close();
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
index 1b9e599d72..bbecda0bc6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
@@ -377,6 +377,27 @@ public class JdbcUtils {
}
}
+ public static boolean insertTable(
+ JdbcClientPool connections, String catalogKey, String
databaseName, String tableName) {
+ try {
+ int insertRecord =
+ connections.run(
+ conn -> {
+ try (PreparedStatement sql =
+ conn.prepareStatement(
+
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
+ sql.setString(1, catalogKey);
+ sql.setString(2, databaseName);
+ sql.setString(3, tableName);
+ return sql.executeUpdate();
+ }
+ });
+ return insertRecord == 1;
+ } catch (SQLException | InterruptedException e) {
+ throw new RuntimeException("Failed to insert table: " + tableName,
e);
+ }
+ }
+
public static boolean insertProperties(
JdbcClientPool connections,
String storeKey,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 9e106ad966..9e7204ec9f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -24,8 +24,12 @@ import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
@@ -34,6 +38,8 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -129,4 +135,199 @@ public class JdbcCatalogTest extends CatalogTestBase {
protected boolean supportsAlterDatabase() {
return true;
}
+
+ @Test
+ public void testRepairTableNotExist() throws Exception {
+ String databaseName = "repair_db";
+ String tableName = "nonexistent_table";
+
+ catalog.createDatabase(databaseName, false);
+ Identifier identifier = Identifier.create(databaseName, tableName);
+
+ // Test repair on non-existent table - should throw
TableNotExistException
+ assertThatThrownBy(() -> catalog.repairTable(identifier))
+ .isInstanceOf(Catalog.TableNotExistException.class);
+ }
+
+ @Test
+ public void testRepairTableWithSystemTable() {
+ Identifier systemTableId = Identifier.create("sys", "system_table");
+
+ // System tables should not be repairable
+ assertThatThrownBy(() -> catalog.repairTable(systemTableId))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("sys");
+ }
+
+ @Test
+ public void testRepairTable() throws Exception {
+ String databaseName = "fs_repair_db";
+ String tableName = "fs_repair_table";
+
+ // Create table normally (this creates both filesystem and JDBC
entries)
+ catalog.createDatabase(databaseName, false);
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
+
+ // Verify table exists in both places
+ assertThat(catalog.listTables(databaseName)).contains(tableName);
+ assertDoesNotThrow(() -> catalog.getTable(identifier));
+
+ // Repair on existing table should work fine (idempotent operation)
+ assertDoesNotThrow(() -> catalog.repairTable(identifier));
+
+ // Table should still exist and be accessible
+ assertThat(catalog.listTables(databaseName)).contains(tableName);
+ assertDoesNotThrow(() -> catalog.getTable(identifier));
+
+ // Test repair when table is missing from JDBC store
+ JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;
+
+ // Remove table from JDBC store but leave filesystem intact
+ JdbcUtils.execute(
+ jdbcCatalog.getConnections(),
+ JdbcUtils.DROP_TABLE_SQL,
+ jdbcCatalog.getCatalogKey(),
+ databaseName,
+ tableName);
+
+ // Verify table is missing from JDBC catalog
+ assertThat(catalog.listTables(databaseName)).doesNotContain(tableName);
+ assertThatThrownBy(() -> catalog.getTable(identifier))
+ .isInstanceOf(Catalog.TableNotExistException.class);
+
+ // Repair the table - should recreate it in JDBC store
+ assertDoesNotThrow(() -> catalog.repairTable(identifier));
+
+ // Verify table is back in JDBC catalog after repair
+ assertThat(catalog.listTables(databaseName)).contains(tableName);
+ assertDoesNotThrow(() -> catalog.getTable(identifier));
+ }
+
+ @Test
+ public void testRepairDatabase() throws Exception {
+ String databaseName = "repair_database";
+
+ // Create database and some tables
+ catalog.createDatabase(databaseName, false);
+ catalog.createTable(Identifier.create(databaseName, "table1"),
DEFAULT_TABLE_SCHEMA, false);
+ catalog.createTable(Identifier.create(databaseName, "table2"),
DEFAULT_TABLE_SCHEMA, false);
+
+ // Test repair database - should not throw exception and should work
correctly
+ assertDoesNotThrow(() -> catalog.repairDatabase(databaseName));
+
+ // Verify tables still exist after repair
+ List<String> tables = catalog.listTables(databaseName);
+ assertThat(tables).containsExactlyInAnyOrder("table1", "table2");
+
+ // Test repair when database is missing from JDBC store
+ JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;
+
+ // Remove database from JDBC store (this also removes tables)
+ JdbcUtils.execute(
+ jdbcCatalog.getConnections(),
+ JdbcUtils.DELETE_TABLES_SQL,
+ jdbcCatalog.getCatalogKey(),
+ databaseName);
+ JdbcUtils.execute(
+ jdbcCatalog.getConnections(),
+ JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL,
+ jdbcCatalog.getCatalogKey(),
+ databaseName);
+
+ // Verify database is missing from JDBC catalog
+ assertThat(catalog.listDatabases()).doesNotContain(databaseName);
+ assertThatThrownBy(() -> catalog.getDatabase(databaseName))
+ .isInstanceOf(Catalog.DatabaseNotExistException.class);
+
+ // Repair the database - should recreate database and tables in JDBC
store
+ assertDoesNotThrow(() -> catalog.repairDatabase(databaseName));
+
+ // Verify database and tables are back in JDBC catalog after repair
+ assertThat(catalog.listDatabases()).contains(databaseName);
+
assertThat(catalog.listTables(databaseName)).containsExactlyInAnyOrder("table1",
"table2");
+ assertDoesNotThrow(() -> catalog.getDatabase(databaseName));
+ }
+
+ @Test
+ public void testRepairDatabaseSystemDatabase() {
+ // System database should not be repairable
+ assertThatThrownBy(() -> catalog.repairDatabase("sys"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("sys");
+ }
+
+ @Test
+ public void testRepairDatabaseNotExists() throws Exception {
+ String nonExistentDb = "non_existent_db";
+
+ // Repairing a non-existent database should throw RuntimeException
+ assertThatThrownBy(() -> catalog.repairDatabase(nonExistentDb))
+ .isInstanceOf(RuntimeException.class);
+
+ // Database should not exist after failed repair
+ assertThat(catalog.listDatabases()).doesNotContain(nonExistentDb);
+ }
+
+ @Test
+ public void testRepairCatalog() throws Exception {
+ // Create multiple databases with tables
+ String[] databases = {"repair_db1", "repair_db2", "repair_db3"};
+
+ Schema schema =
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "data", DataTypes.STRING())),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ "");
+
+ for (String dbName : databases) {
+ catalog.createDatabase(dbName, false);
+ catalog.createTable(Identifier.create(dbName, "test_table"),
schema, false);
+ }
+
+ // Test repair entire catalog - should not throw exception
+ assertDoesNotThrow(() -> catalog.repairCatalog());
+
+ // Verify all databases and tables still exist
+ List<String> catalogDatabases = catalog.listDatabases();
+ for (String dbName : databases) {
+ assertThat(catalogDatabases).contains(dbName);
+ assertThat(catalog.listTables(dbName)).contains("test_table");
+ }
+ }
+
+ @Test
+ public void testInsertTableUtility() throws Exception {
+ String databaseName = "insert_test_db";
+ String tableName = "insert_test_table";
+
+ catalog.createDatabase(databaseName, false);
+
+ JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;
+
+ // Test insertTable utility method
+ boolean result =
+ JdbcUtils.insertTable(
+ jdbcCatalog.getConnections(),
+ jdbcCatalog.getCatalogKey(),
+ databaseName,
+ tableName);
+
+ assertThat(result).isTrue();
+
+ // Try inserting the same table again - should throw exception for
duplicate
+ assertThatThrownBy(
+ () ->
+ JdbcUtils.insertTable(
+ jdbcCatalog.getConnections(),
+ jdbcCatalog.getCatalogKey(),
+ databaseName,
+ tableName))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to insert table");
+ }
}