This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 601145620c3fa8d001ba19c4dc915a3ffd7a1656
Author: zhuangchong <[email protected]>
AuthorDate: Tue May 21 11:18:34 2024 +0800

    [core][hive] Introduce repair metastore procedure
    
    This closes #3355
---
 docs/content/flink/procedures.md                   |  18 +++
 docs/content/spark/procedures.md                   |  10 ++
 .../org/apache/paimon/catalog/AbstractCatalog.java |  66 +++++++--
 .../java/org/apache/paimon/catalog/Catalog.java    |  12 ++
 .../apache/paimon/catalog/FileSystemCatalog.java   |  42 +-----
 .../paimon/flink/procedure/RepairProcedure.java    |  78 +++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 149 ++++++++++++++++++++-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 109 +++++++++++++++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/MigrateTableProcedure.java     |   2 +-
 .../paimon/spark/procedure/RepairProcedure.java    | 111 +++++++++++++++
 12 files changed, 548 insertions(+), 52 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 5b1acb59e..a1a8bfe37 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -237,6 +237,24 @@ All available procedures are listed below.
          CALL sys.expire_snapshots(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00', retain_min => 10)<br/><br/>
          CALL sys.expire_snapshots(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
       </td>
+   </tr>
+    <tr>
+      <td>repair</td>
+      <td>
+         -- repair all databases and tables in catalog<br/>
+         CALL sys.repair()<br/><br/>
+         -- repair all tables in a specific database<br/>
+         CALL sys.repair('databaseName')<br/><br/>
+         -- repair a table<br/>
+         CALL sys.repair('databaseName.tableName')<br/><br/>
+      </td>
+      <td>
+         Synchronize information from the file system to Metastore. Argument:
+            <li>empty: all databases and tables in catalog.</li>
+            <li>databaseName : the target database name.</li>
+            <li>tableName: the target table identifier.</li>
+      </td>
+      <td>CALL sys.repair('test_db.T')</td>
    </tr>
    </tbody>
 </table>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index be1551daf..81d829583 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -124,5 +124,15 @@ This section introduce all available spark procedures 
about paimon.
           CALL sys.remove_orphan_files(table => 'default.T', older_than => 
'2023-10-31 12:00:00')
       </td>
     </tr>
+    <tr>
+      <td>repair</td>
+      <td>
+         Synchronize information from the file system to Metastore. Argument:
+            <li>database_or_table: empty or the target database name or the 
target table identifier.</li>
+      </td>
+      <td>
+          CALL sys.repair('test_db.T')
+      </td>
+    </tr>
     </tbody>
 </table>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 605fd14f6..63428635e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.operation.FileStoreCommit;
@@ -29,6 +30,7 @@ import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
@@ -48,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
@@ -124,6 +127,17 @@ public abstract class AbstractCatalog implements Catalog {
         return catalogOptions.get(LOCK_ENABLED);
     }
 
+    protected List<String> listDatabases(Path warehouse) {
+        List<String> databases = new ArrayList<>();
+        for (FileStatus status : uncheck(() -> 
fileIO.listDirectories(warehouse))) {
+            Path path = status.getPath();
+            if (status.isDir() && isDatabase(path)) {
+                databases.add(database(path));
+            }
+        }
+        return databases;
+    }
+
     @Override
     public boolean databaseExists(String databaseName) {
         if (isSystemDatabase(databaseName)) {
@@ -138,9 +152,7 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public void createDatabase(String name, boolean ignoreIfExists, 
Map<String, String> properties)
             throws DatabaseAlreadyExistException {
-        if (isSystemDatabase(name)) {
-            throw new ProcessSystemDatabaseException();
-        }
+        checkNotSystemDatabase(name);
         if (databaseExists(name)) {
             if (ignoreIfExists) {
                 return;
@@ -179,9 +191,7 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
             throws DatabaseNotExistException, DatabaseNotEmptyException {
-        if (isSystemDatabase(name)) {
-            throw new ProcessSystemDatabaseException();
-        }
+        checkNotSystemDatabase(name);
         if (!databaseExists(name)) {
             if (ignoreIfNotExists) {
                 return;
@@ -210,8 +220,22 @@ public abstract class AbstractCatalog implements Catalog {
         return 
listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
     }
 
+    protected List<String> listTablesImpl(Path databasePath) {
+        List<String> tables = new ArrayList<>();
+        for (FileStatus status : uncheck(() -> 
fileIO.listDirectories(databasePath))) {
+            if (status.isDir() && tableExists(status.getPath())) {
+                tables.add(status.getPath().getName());
+            }
+        }
+        return tables;
+    }
+
     protected abstract List<String> listTablesImpl(String databaseName);
 
+    protected boolean tableExists(Path tablePath) {
+        return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
+    }
+
     @Override
     public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
@@ -396,7 +420,7 @@ public abstract class AbstractCatalog implements Catalog {
         return isSystemDatabase(identifier.getDatabaseName()) || 
isSpecifiedSystemTable(identifier);
     }
 
-    private void checkNotSystemTable(Identifier identifier, String method) {
+    protected void checkNotSystemTable(Identifier identifier, String method) {
         if (isSystemTable(identifier)) {
             throw new IllegalArgumentException(
                     String.format(
@@ -439,6 +463,13 @@ public abstract class AbstractCatalog implements Catalog {
         return SYSTEM_DATABASE_NAME.equals(database);
     }
 
+    /** Validate database cannot be a system database. */
+    protected void checkNotSystemDatabase(String database) {
+        if (isSystemDatabase(database)) {
+            throw new ProcessSystemDatabaseException();
+        }
+    }
+
     /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
     public static void validateCaseInsensitive(
             boolean caseSensitive, String type, String... names) {
@@ -460,7 +491,7 @@ public abstract class AbstractCatalog implements Catalog {
                         type, illegalNames));
     }
 
-    private void validateIdentifierNameCaseInsensitive(Identifier identifier) {
+    protected void validateIdentifierNameCaseInsensitive(Identifier 
identifier) {
         validateCaseInsensitive(caseSensitive(), "Database", 
identifier.getDatabaseName());
         validateCaseInsensitive(caseSensitive(), "Table", 
identifier.getObjectName());
     }
@@ -479,7 +510,7 @@ public abstract class AbstractCatalog implements Catalog {
         validateFieldNameCaseInsensitive(fieldNames);
     }
 
-    private void validateFieldNameCaseInsensitive(List<String> fieldNames) {
+    protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
         validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
     }
 
@@ -493,4 +524,21 @@ public abstract class AbstractCatalog implements Catalog {
                         "The value of %s property should be %s.",
                         CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
     }
+
+    private static boolean isDatabase(Path path) {
+        return path.getName().endsWith(DB_SUFFIX);
+    }
+
+    private static String database(Path path) {
+        String name = path.getName();
+        return name.substring(0, name.length() - DB_SUFFIX.length());
+    }
+
+    protected static <T> T uncheck(Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 4f6341417..96e85d20a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -258,6 +258,18 @@ public interface Catalog extends AutoCloseable {
         return true;
     }
 
+    default void repairCatalog() {
+        throw new UnsupportedOperationException();
+    }
+
+    default void repairDatabase(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    default void repairTable(Identifier identifier) throws 
TableNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
     /** Exception for trying to drop on a database that is not empty. */
     class DatabaseNotEmptyException extends Exception {
         private static final String MSG = "Database %s is not empty.";
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 1e4e5b0eb..60e688bf8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
@@ -31,11 +30,9 @@ import org.apache.paimon.schema.TableSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
 
@@ -58,14 +55,7 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public List<String> listDatabases() {
-        List<String> databases = new ArrayList<>();
-        for (FileStatus status : uncheck(() -> 
fileIO.listDirectories(warehouse))) {
-            Path path = status.getPath();
-            if (status.isDir() && isDatabase(path)) {
-                databases.add(database(path));
-            }
-        }
-        return databases;
+        return listDatabases(warehouse);
     }
 
     @Override
@@ -99,14 +89,7 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     protected List<String> listTablesImpl(String databaseName) {
-        List<String> tables = new ArrayList<>();
-        for (FileStatus status :
-                uncheck(() -> 
fileIO.listDirectories(newDatabasePath(databaseName)))) {
-            if (status.isDir() && tableExists(status.getPath())) {
-                tables.add(status.getPath().getName());
-            }
-        }
-        return tables;
+        return listTablesImpl(newDatabasePath(databaseName));
     }
 
     @Override
@@ -118,10 +101,6 @@ public class FileSystemCatalog extends AbstractCatalog {
         return tableExists(getDataTableLocation(identifier));
     }
 
-    private boolean tableExists(Path tablePath) {
-        return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
-    }
-
     @Override
     public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
         return schemaManager(identifier)
@@ -170,23 +149,6 @@ public class FileSystemCatalog extends AbstractCatalog {
         schemaManager(identifier).commitChanges(changes);
     }
 
-    private static <T> T uncheck(Callable<T> callable) {
-        try {
-            return callable.call();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static boolean isDatabase(Path path) {
-        return path.getName().endsWith(DB_SUFFIX);
-    }
-
-    private static String database(Path path) {
-        String name = path.getName();
-        return name.substring(0, name.length() - DB_SUFFIX.length());
-    }
-
     @Override
     public void close() throws Exception {}
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
new file mode 100644
index 000000000..d637eb0b7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Repair procedure. Usage:
+ *
+ * <pre><code>
+ *  -- repair all databases and tables in catalog
+ *  CALL sys.repair()
+ *
+ *  -- repair all tables in a specific database
+ *  CALL sys.repair('databaseName')
+ *
+ *  -- repair a table
+ *  CALL sys.repair('databaseName.tableName')
+ * </code></pre>
+ */
+public class RepairProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "repair";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(ProcedureContext procedureContext)
+            throws Catalog.TableNotExistException, 
Catalog.DatabaseNotExistException {
+        return call(procedureContext, null);
+    }
+
+    public String[] call(ProcedureContext procedureContext, String identifier)
+            throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
+        if (StringUtils.isBlank(identifier)) {
+            catalog.repairCatalog();
+            return new String[] {"Success"};
+        }
+        String[] paths = identifier.split("\\.");
+        switch (paths.length) {
+            case 1:
+                catalog.repairDatabase(paths[0]);
+                break;
+            case 2:
+                catalog.repairTable(Identifier.create(paths[0], paths[1]));
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cannot get splits from '%s' to get database 
and table",
+                                identifier));
+        }
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 943da3e16..848dd317d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -50,3 +50,4 @@ 
org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
 org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
 org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
 org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
+org.apache.paimon.flink.procedure.RepairProcedure
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 0d71e70a6..77a3b40af 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogLockContext;
 import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.MetastoreClient;
@@ -40,9 +41,16 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableType;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataPartitionComputer;
 
 import org.apache.flink.table.hive.LegacyHiveClasses;
 import org.apache.hadoop.conf.Configuration;
@@ -72,11 +80,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -89,6 +99,7 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
@@ -336,6 +347,10 @@ public class HiveCatalog extends AbstractCatalog {
             throw new TableNotExistException(identifier);
         }
         Path tableLocation = getDataTableLocation(identifier);
+        return getDataTableSchema(tableLocation);
+    }
+
+    private TableSchema getDataTableSchema(Path tableLocation) {
         return new SchemaManager(fileIO, tableLocation)
                 .latest()
                 .orElseThrow(
@@ -422,7 +437,7 @@ public class HiveCatalog extends AbstractCatalog {
             client.alter_table(fromDB, fromTableName, table);
 
             Path fromPath = getDataTableLocation(fromTable);
-            if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+            if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
                 // Rename the file system's table directory. Maintain 
consistency between tables in
                 // the file system and tables in the Hive Metastore.
                 Path toPath = getDataTableLocation(toTable);
@@ -471,6 +486,136 @@ public class HiveCatalog extends AbstractCatalog {
         return false;
     }
 
+    @Override
+    public void repairCatalog() {
+        List<String> databases = listDatabases(new Path(warehouse));
+        for (String database : databases) {
+            repairDatabase(database);
+        }
+    }
+
+    @Override
+    public void repairDatabase(String databaseName) {
+        Path databasePath = repairHmsDatabase(databaseName);
+        List<String> tables = listTablesImpl(databasePath);
+        CompletableFuture<Void> allOf =
+                CompletableFuture.allOf(
+                        tables.stream()
+                                .map(table -> Identifier.create(databaseName, 
table))
+                                .map(
+                                        identifier ->
+                                                CompletableFuture.runAsync(
+                                                        () -> {
+                                                            try {
+                                                                
repairTable(identifier);
+                                                            } catch 
(TableNotExistException e) {
+                                                                LOG.error(
+                                                                        "Table 
{} does not exist in the paimon.",
+                                                                        
identifier.getFullName());
+                                                                // ignore
+                                                            }
+                                                        },
+                                                        
COMMON_IO_FORK_JOIN_POOL))
+                                .toArray(CompletableFuture[]::new));
+        allOf.join();
+    }
+
+    private Path repairHmsDatabase(String databaseName) {
+        checkNotSystemDatabase(databaseName);
+        if (!databaseExistsImpl(databaseName)) {
+            createDatabaseImpl(databaseName, Collections.emptyMap());
+        }
+
+        try {
+            Database database = client.getDatabase(databaseName);
+            return new Path(locationHelper.getDatabaseLocation(database));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to determine if database "
+                            + databaseName
+                            + " exists in hive metastore.",
+                    e);
+        }
+    }
+
+    @Override
+    public void repairTable(Identifier identifier) throws 
TableNotExistException {
+        checkNotSystemTable(identifier, "repairTable");
+        validateIdentifierNameCaseInsensitive(identifier);
+
+        // Get paimon table from file system.
+        Path paimonTableLocation = getDataTableLocation(identifier);
+        TableSchema tableSchema = getDataTableSchema(paimonTableLocation);
+        validateFieldNameCaseInsensitive(tableSchema.fieldNames());
+        FileStoreTable paimonTable =
+                FileStoreTableFactory.create(
+                        fileIO,
+                        paimonTableLocation,
+                        tableSchema,
+                        new CatalogEnvironment(
+                                Lock.factory(
+                                        lockFactory().orElse(null),
+                                        lockContext().orElse(null),
+                                        identifier),
+                                
super.metastoreClientFactory(identifier).orElse(null),
+                                lineageMetaFactory));
+
+        try {
+            try {
+                Table table =
+                        client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
+                checkArgument(
+                        isPaimonTable(table) || 
LegacyHiveClasses.isPaimonTable(table),
+                        String.format(
+                                "Table %s is not a paimon table in hive 
metastore.",
+                                identifier.getFullName()));
+                updateHmsTablePars(table, tableSchema);
+                updateHmsTable(table, identifier, tableSchema);
+                client.alter_table(
+                        identifier.getDatabaseName(), 
identifier.getObjectName(), table, true);
+            } catch (NoSuchObjectException e) {
+                // hive table does not exist.
+                HashMap<String, String> newOptions = new 
HashMap<>(paimonTable.options());
+                copyTableDefaultOptions(newOptions);
+                tableSchema = paimonTable.schema().copy(newOptions);
+                Table table =
+                        newHmsTable(
+                                identifier,
+                                
convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX));
+                updateHmsTable(table, identifier, tableSchema);
+                client.createTable(table);
+            }
+
+            // repair partitions
+            repairPartition(paimonTable, identifier, tableSchema);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void repairPartition(
+            org.apache.paimon.table.Table paimonTable, Identifier identifier, 
TableSchema schema)
+            throws Exception {
+        if (!schema.partitionKeys().isEmpty()) {
+            MetastoreClient metastoreClient = 
metastoreClientFactory(identifier).get().create();
+
+            ReadBuilder readBuilder = paimonTable.newReadBuilder();
+            List<BinaryRow> partitions = 
readBuilder.newScan().listPartitions();
+            RowDataPartitionComputer partitionComputer =
+                    FileStorePathFactory.getPartitionComputer(
+                            schema.logicalPartitionType(),
+                            new 
CoreOptions(schema.options()).partitionDefaultName());
+            for (BinaryRow partition : partitions) {
+                LinkedHashMap<String, String> partitionSpec =
+                        partitionComputer.generatePartValues(
+                                Preconditions.checkNotNull(
+                                        partition,
+                                        "Partition row data is null. This is 
unexpected."));
+                metastoreClient.addPartition(partitionSpec);
+            }
+        }
+    }
+
     @Override
     public void close() throws Exception {
         client.close();
@@ -523,7 +668,7 @@ public class HiveCatalog extends AbstractCatalog {
         sd.setSerdeInfo(serDeInfo);
 
         CoreOptions options = new CoreOptions(schema.options());
-        if (options.partitionedTableInMetastore() && 
schema.partitionKeys().size() > 0) {
+        if (options.partitionedTableInMetastore() && 
!schema.partitionKeys().isEmpty()) {
             Map<String, DataField> fieldMap =
                     schema.fields().stream()
                             .collect(Collectors.toMap(DataField::name, 
Function.identity()));
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 5a915d200..ac140d949 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -1167,6 +1167,115 @@ public abstract class HiveCatalogITCaseBase {
         insertSql.getJobClient().get().cancel();
     }
 
+    @Test
+    public void testRepairTable() throws Exception {
+        TableEnvironment fileCatalog = useFileCatalog();
+        // Database test_db exists in hive metastore
+        hiveShell.execute("use test_db");
+        // When the Hive table does not exist, specify the paimon table to 
create hive table in hive
+        // metastore.
+        tEnv.executeSql("CALL sys.repair('test_db.t_repair_hive')");
+
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+                .containsExactlyInAnyOrder("dt=2020-01-02/hh=09");
+
+        alterTableInFileSystem(fileCatalog);
+
+        // When the Hive table exists, specify the paimon table to update hive 
table in hive
+        // metastore.
+        tEnv.executeSql("CALL sys.repair('test_db.t_repair_hive')");
+        assertThat(
+                        hiveShell
+                                .executeQuery("DESC FORMATTED t_repair_hive")
+                                .contains("item_id\tbigint\titem id"))
+                .isTrue();
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+                .containsExactlyInAnyOrder("dt=2020-01-02/hh=09", 
"dt=2020-01-03/hh=10");
+    }
+
+    @Test
+    public void testRepairTableWithCustomLocation() throws Exception {
+        TableEnvironment fileCatalog = useFileCatalog();
+        // Database exists in hive metastore and uses custom location.
+        String databaseLocation = path + "test_db.db";
+        hiveShell.execute("CREATE DATABASE my_database\n" + "LOCATION '" + 
databaseLocation + "';");
+        hiveShell.execute("USE my_database");
+
+        // When the Hive table does not exist, specify the paimon table to 
create hive table in hive
+        // metastore.
+        tEnv.executeSql("CALL 
sys.repair('my_database.t_repair_hive')").await();
+
+        String tableLocation = databaseLocation + "/t_repair_hive";
+        assertThat(
+                        hiveShell
+                                .executeQuery("DESC FORMATTED t_repair_hive")
+                                .contains("Location:           \t" + 
tableLocation + "\tNULL"))
+                .isTrue();
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+                .containsExactlyInAnyOrder("dt=2020-01-02/hh=09");
+
+        alterTableInFileSystem(fileCatalog);
+
+        // When the Hive table exists, specify the paimon table to update hive 
table in hive
+        // metastore.
+        tEnv.executeSql("CALL sys.repair('my_database.t_repair_hive')");
+        assertThat(
+                        hiveShell
+                                .executeQuery("DESC FORMATTED t_repair_hive")
+                                .contains("Location:           \t" + 
tableLocation + "\tNULL"))
+                .isTrue();
+        assertThat(
+                        hiveShell
+                                .executeQuery("DESC FORMATTED t_repair_hive")
+                                .contains("item_id\tbigint\titem id"))
+                .isTrue();
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+                .containsExactlyInAnyOrder("dt=2020-01-02/hh=09", 
"dt=2020-01-03/hh=10");
+    }
+
+    /** Prepare to update a paimon table with a custom path in the paimon file 
system. */
+    private void alterTableInFileSystem(TableEnvironment tEnv) throws 
Exception {
+        tEnv.executeSql(
+                        "ALTER TABLE t_repair_hive ADD item_id BIGINT COMMENT 
'item id' AFTER user_id")
+                .await();
+        tEnv.executeSql("INSERT INTO t_repair_hive VALUES(2, 1, 'click', 
'2020-01-03', '10')")
+                .await();
+    }
+
+    private TableEnvironment useFileCatalog() throws Exception {
+        String fileCatalog =
+                "CREATE CATALOG my_file WITH ( "
+                        + "'type' = 'paimon',\n"
+                        + "'warehouse' = '"
+                        + path
+                        + "' "
+                        + ")";
+        TableEnvironment tEnv =
+                TableEnvironmentImpl.create(
+                        
EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(fileCatalog).await();
+
+        tEnv.executeSql("USE CATALOG my_file").await();
+
+        // Prepare a paimon table with a custom path in the paimon file system.
+        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
+        tEnv.executeSql("USE test_db").await();
+        tEnv.executeSql(
+                        "CREATE TABLE t_repair_hive (\n"
+                                + "    user_id BIGINT,\n"
+                                + "    behavior STRING,\n"
+                                + "    dt STRING,\n"
+                                + "    hh STRING,\n"
+                                + "    PRIMARY KEY (dt, hh, user_id) NOT 
ENFORCED\n"
+                                + ") PARTITIONED BY (dt, hh)"
+                                + " WITH (\n"
+                                + "'metastore.partitioned-table' = 'true'\n"
+                                + ");")
+                .await();
+        tEnv.executeSql("INSERT INTO t_repair_hive VALUES(1, 'login', 
'2020-01-02', '09')").await();
+        return tEnv;
+    }
+
     private void assertNoPrivilege(Executable executable) {
         Exception e = assertThrows(Exception.class, executable);
         if (e.getCause() != null) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 4af371c69..a61642bea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -27,6 +27,7 @@ import 
org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
 import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
+import org.apache.paimon.spark.procedure.RepairProcedure;
 import org.apache.paimon.spark.procedure.RollbackProcedure;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -58,6 +59,7 @@ public class SparkProcedures {
         procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
         procedureBuilders.put("remove_orphan_files", 
RemoveOrphanFilesProcedure::builder);
         procedureBuilders.put("expire_snapshots", 
ExpireSnapshotsProcedure::builder);
+        procedureBuilders.put("repair", RepairProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index 881e9d7d3..87090dd92 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -81,7 +81,7 @@ public class MigrateTableProcedure extends BaseProcedure {
         String format = args.getString(0);
         String sourceTable = args.getString(1);
         String properties = args.isNullAt(2) ? null : args.getString(2);
-        boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
+        boolean deleteNeed = args.isNullAt(3) || args.getBoolean(3);
         String targetTable = args.isNullAt(4) ? null : args.getString(4);
 
         Identifier sourceTableId = Identifier.fromString(sourceTable);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RepairProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RepairProcedure.java
new file mode 100644
index 000000000..47e5ceb4f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RepairProcedure.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Repair Procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.repair([database_or_table => 'tableId'])
+ * </code></pre>
+ */
+public class RepairProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] 
{ProcedureParameter.required("database_or_table", StringType)};
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected RepairProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
+        String identifier = args.getString(0);
+        try {
+            if (StringUtils.isBlank(identifier)) {
+                paimonCatalog.repairCatalog();
+                return new InternalRow[] {newInternalRow(true)};
+            }
+
+            String[] paths = identifier.split("\\.");
+            switch (paths.length) {
+                case 1:
+                    paimonCatalog.repairDatabase(paths[0]);
+                    break;
+                case 2:
+                    paimonCatalog.repairTable(Identifier.create(paths[0], 
paths[1]));
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Cannot get splits from '%s' to get 
database and table",
+                                    identifier));
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException("Call repair error", e);
+        }
+        return new InternalRow[] {newInternalRow(true)};
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<RepairProcedure>() {
+            @Override
+            public RepairProcedure doBuild() {
+                return new RepairProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "RepairProcedure";
+    }
+}


Reply via email to