This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 97ccc58d95 [core]: Supporting repair procedure for Jdbc catalogs 
(#7289)
97ccc58d95 is described below

commit 97ccc58d9504474d6f9527f6ea90745b8d7385c3
Author: junmuz <[email protected]>
AuthorDate: Thu Feb 26 09:35:26 2026 +0000

    [core]: Supporting repair procedure for Jdbc catalogs (#7289)
---
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   | 100 ++++++++--
 .../java/org/apache/paimon/jdbc/JdbcUtils.java     |  21 +++
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    | 201 +++++++++++++++++++++
 3 files changed, 309 insertions(+), 13 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 0891bad2bd..c37917d0e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -53,6 +53,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.AbstractMap;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +62,9 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
 import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
 import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties;
@@ -112,6 +116,10 @@ public class JdbcCatalog extends AbstractCatalog {
         return connections;
     }
 
+    public String getCatalogKey() {
+        return catalogKey;
+    }
+
     /** Initialize catalog tables. */
     private void initializeCatalogTablesIfNeed() throws SQLException, 
InterruptedException {
         String uri = options.get(CatalogOptions.URI.key());
@@ -301,19 +309,11 @@ public class JdbcCatalog extends AbstractCatalog {
             runWithLock(identifier, () -> schemaManager.createTable(schema));
             // Update schema metadata
             Path path = getTableLocation(identifier);
-            int insertRecord =
-                    connections.run(
-                            conn -> {
-                                try (PreparedStatement sql =
-                                        conn.prepareStatement(
-                                                
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
-                                    sql.setString(1, catalogKey);
-                                    sql.setString(2, 
identifier.getDatabaseName());
-                                    sql.setString(3, 
identifier.getTableName());
-                                    return sql.executeUpdate();
-                                }
-                            });
-            if (insertRecord == 1) {
+            if (JdbcUtils.insertTable(
+                    connections,
+                    catalogKey,
+                    identifier.getDatabaseName(),
+                    identifier.getTableName())) {
                 LOG.debug("Successfully committed to new table: {}", 
identifier);
             } else {
                 try {
@@ -415,6 +415,80 @@ public class JdbcCatalog extends AbstractCatalog {
         return Lock.fromCatalog(lock, identifier).runWithLock(callable);
     }
 
+    @Override
+    public void repairCatalog() {
+        List<String> databases;
+        try {
+            databases = listDatabasesInFileSystem(new Path(warehouse));
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to list databases in file 
system", e);
+        }
+        for (String database : databases) {
+            repairDatabase(database);
+        }
+    }
+
+    @Override
+    public void repairDatabase(String databaseName) {
+        checkNotSystemDatabase(databaseName);
+
+        // First check if database exists in file system
+        Path databasePath = newDatabasePath(databaseName);
+        List<String> tables;
+        try {
+            if (!fileIO.exists(databasePath)) {
+                throw new RuntimeException("Database directory does not exist: 
" + databasePath);
+            }
+            tables = listTablesInFileSystem(databasePath);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
+            createDatabaseImpl(databaseName, Collections.emptyMap());
+        }
+
+        // Repair tables
+        for (String table : tables) {
+            try {
+                repairTable(Identifier.create(databaseName, table));
+            } catch (TableNotExistException ignore) {
+                // Table might not exist due to concurrent operations
+            }
+        }
+    }
+
+    @Override
+    public void repairTable(Identifier identifier) throws 
TableNotExistException {
+        checkNotBranch(identifier, "repairTable");
+        checkNotSystemTable(identifier, "repairTable");
+
+        // First check if table exists in file system
+        Path tableLocation = getTableLocation(identifier);
+        TableSchema tableSchema =
+                tableSchemaInFileSystem(tableLocation, 
identifier.getBranchNameOrDefault())
+                        .orElseThrow(() -> new 
TableNotExistException(identifier));
+
+        if (!JdbcUtils.databaseExists(connections, catalogKey, 
identifier.getDatabaseName())) {
+            createDatabaseImpl(identifier.getDatabaseName(), 
Collections.emptyMap());
+        }
+        // Table exists in file system, now check if it exists in JDBC catalog
+        if (!JdbcUtils.tableExists(
+                connections, catalogKey, identifier.getDatabaseName(), 
identifier.getTableName())) {
+            // Table missing from JDBC catalog, repair it
+            if (JdbcUtils.insertTable(
+                    connections,
+                    catalogKey,
+                    identifier.getDatabaseName(),
+                    identifier.getTableName())) {
+                LOG.debug("Successfully repaired table: {}", identifier);
+            } else {
+                LOG.error("Failed to repair table: {}", identifier);
+            }
+        }
+        // If table exists in both file system and JDBC catalog, nothing to 
repair
+    }
+
     @Override
     public void close() throws Exception {
         connections.close();
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
index 1b9e599d72..bbecda0bc6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
@@ -377,6 +377,27 @@ public class JdbcUtils {
         }
     }
 
+    public static boolean insertTable(
+            JdbcClientPool connections, String catalogKey, String 
databaseName, String tableName) {
+        try {
+            int insertRecord =
+                    connections.run(
+                            conn -> {
+                                try (PreparedStatement sql =
+                                        conn.prepareStatement(
+                                                
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
+                                    sql.setString(1, catalogKey);
+                                    sql.setString(2, databaseName);
+                                    sql.setString(3, tableName);
+                                    return sql.executeUpdate();
+                                }
+                            });
+            return insertRecord == 1;
+        } catch (SQLException | InterruptedException e) {
+            throw new RuntimeException("Failed to insert table: " + tableName, 
e);
+        }
+    }
+
     public static boolean insertProperties(
             JdbcClientPool connections,
             String storeKey,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 9e106ad966..9e7204ec9f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -24,8 +24,12 @@ import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -34,6 +38,8 @@ import org.junit.jupiter.api.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -129,4 +135,199 @@ public class JdbcCatalogTest extends CatalogTestBase {
     protected boolean supportsAlterDatabase() {
         return true;
     }
+
+    @Test
+    public void testRepairTableNotExist() throws Exception {
+        String databaseName = "repair_db";
+        String tableName = "nonexistent_table";
+
+        catalog.createDatabase(databaseName, false);
+        Identifier identifier = Identifier.create(databaseName, tableName);
+
+        // Test repair on non-existent table - should throw 
TableNotExistException
+        assertThatThrownBy(() -> catalog.repairTable(identifier))
+                .isInstanceOf(Catalog.TableNotExistException.class);
+    }
+
+    @Test
+    public void testRepairTableWithSystemTable() {
+        Identifier systemTableId = Identifier.create("sys", "system_table");
+
+        // System tables should not be repairable
+        assertThatThrownBy(() -> catalog.repairTable(systemTableId))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("sys");
+    }
+
+    @Test
+    public void testRepairTable() throws Exception {
+        String databaseName = "fs_repair_db";
+        String tableName = "fs_repair_table";
+
+        // Create table normally (this creates both filesystem and JDBC 
entries)
+        catalog.createDatabase(databaseName, false);
+        Identifier identifier = Identifier.create(databaseName, tableName);
+        catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
+
+        // Verify table exists in both places
+        assertThat(catalog.listTables(databaseName)).contains(tableName);
+        assertDoesNotThrow(() -> catalog.getTable(identifier));
+
+        // Repair on existing table should work fine (idempotent operation)
+        assertDoesNotThrow(() -> catalog.repairTable(identifier));
+
+        // Table should still exist and be accessible
+        assertThat(catalog.listTables(databaseName)).contains(tableName);
+        assertDoesNotThrow(() -> catalog.getTable(identifier));
+
+        // Test repair when table is missing from JDBC store
+        JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;
+
+        // Remove table from JDBC store but leave filesystem intact
+        JdbcUtils.execute(
+                jdbcCatalog.getConnections(),
+                JdbcUtils.DROP_TABLE_SQL,
+                jdbcCatalog.getCatalogKey(),
+                databaseName,
+                tableName);
+
+        // Verify table is missing from JDBC catalog
+        assertThat(catalog.listTables(databaseName)).doesNotContain(tableName);
+        assertThatThrownBy(() -> catalog.getTable(identifier))
+                .isInstanceOf(Catalog.TableNotExistException.class);
+
+        // Repair the table - should recreate it in JDBC store
+        assertDoesNotThrow(() -> catalog.repairTable(identifier));
+
+        // Verify table is back in JDBC catalog after repair
+        assertThat(catalog.listTables(databaseName)).contains(tableName);
+        assertDoesNotThrow(() -> catalog.getTable(identifier));
+    }
+
+    @Test
+    public void testRepairDatabase() throws Exception {
+        String databaseName = "repair_database";
+
+        // Create database and some tables
+        catalog.createDatabase(databaseName, false);
+        catalog.createTable(Identifier.create(databaseName, "table1"), 
DEFAULT_TABLE_SCHEMA, false);
+        catalog.createTable(Identifier.create(databaseName, "table2"), 
DEFAULT_TABLE_SCHEMA, false);
+
+        // Test repair database - should not throw exception and should work 
correctly
+        assertDoesNotThrow(() -> catalog.repairDatabase(databaseName));
+
+        // Verify tables still exist after repair
+        List<String> tables = catalog.listTables(databaseName);
+        assertThat(tables).containsExactlyInAnyOrder("table1", "table2");
+
+        // Test repair when database is missing from JDBC store
+        JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;
+
+        // Remove database from JDBC store (this also removes tables)
+        JdbcUtils.execute(
+                jdbcCatalog.getConnections(),
+                JdbcUtils.DELETE_TABLES_SQL,
+                jdbcCatalog.getCatalogKey(),
+                databaseName);
+        JdbcUtils.execute(
+                jdbcCatalog.getConnections(),
+                JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL,
+                jdbcCatalog.getCatalogKey(),
+                databaseName);
+
+        // Verify database is missing from JDBC catalog
+        assertThat(catalog.listDatabases()).doesNotContain(databaseName);
+        assertThatThrownBy(() -> catalog.getDatabase(databaseName))
+                .isInstanceOf(Catalog.DatabaseNotExistException.class);
+
+        // Repair the database - should recreate database and tables in JDBC 
store
+        assertDoesNotThrow(() -> catalog.repairDatabase(databaseName));
+
+        // Verify database and tables are back in JDBC catalog after repair
+        assertThat(catalog.listDatabases()).contains(databaseName);
+        
assertThat(catalog.listTables(databaseName)).containsExactlyInAnyOrder("table1",
 "table2");
+        assertDoesNotThrow(() -> catalog.getDatabase(databaseName));
+    }
+
+    @Test
+    public void testRepairDatabaseSystemDatabase() {
+        // System database should not be repairable
+        assertThatThrownBy(() -> catalog.repairDatabase("sys"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("sys");
+    }
+
+    @Test
+    public void testRepairDatabaseNotExists() throws Exception {
+        String nonExistentDb = "non_existent_db";
+
+        // Repairing a non-existent database should throw RuntimeException
+        assertThatThrownBy(() -> catalog.repairDatabase(nonExistentDb))
+                .isInstanceOf(RuntimeException.class);
+
+        // Database should not exist after failed repair
+        assertThat(catalog.listDatabases()).doesNotContain(nonExistentDb);
+    }
+
+    @Test
+    public void testRepairCatalog() throws Exception {
+        // Create multiple databases with tables
+        String[] databases = {"repair_db1", "repair_db2", "repair_db3"};
+
+        Schema schema =
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "id", DataTypes.INT()),
+                                new DataField(1, "data", DataTypes.STRING())),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        "");
+
+        for (String dbName : databases) {
+            catalog.createDatabase(dbName, false);
+            catalog.createTable(Identifier.create(dbName, "test_table"), 
schema, false);
+        }
+
+        // Test repair entire catalog - should not throw exception
+        assertDoesNotThrow(() -> catalog.repairCatalog());
+
+        // Verify all databases and tables still exist
+        List<String> catalogDatabases = catalog.listDatabases();
+        for (String dbName : databases) {
+            assertThat(catalogDatabases).contains(dbName);
+            assertThat(catalog.listTables(dbName)).contains("test_table");
+        }
+    }
+
+    @Test
+    public void testInsertTableUtility() throws Exception {
+        String databaseName = "insert_test_db";
+        String tableName = "insert_test_table";
+
+        catalog.createDatabase(databaseName, false);
+
+        JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;
+
+        // Test insertTable utility method
+        boolean result =
+                JdbcUtils.insertTable(
+                        jdbcCatalog.getConnections(),
+                        jdbcCatalog.getCatalogKey(),
+                        databaseName,
+                        tableName);
+
+        assertThat(result).isTrue();
+
+        // Try inserting the same table again - should throw exception for 
duplicate
+        assertThatThrownBy(
+                        () ->
+                                JdbcUtils.insertTable(
+                                        jdbcCatalog.getConnections(),
+                                        jdbcCatalog.getCatalogKey(),
+                                        databaseName,
+                                        tableName))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Failed to insert table");
+    }
 }

Reply via email to