This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 592bd24b545b2ce469df481947f15a9a2f2d7f6a
Author: Jingsong <[email protected]>
AuthorDate: Fri May 24 16:17:12 2024 +0800

    [core][hive] Optimize codes for repairing metastore procedure
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  64 +++----
 .../apache/paimon/catalog/FileSystemCatalog.java   |  15 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 194 ++++++++-------------
 .../apache/paimon/hive/HiveMetastoreClient.java    |   2 +-
 4 files changed, 111 insertions(+), 164 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 63428635e..f93198fb8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -42,6 +42,7 @@ import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,7 +51,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
@@ -127,17 +127,6 @@ public abstract class AbstractCatalog implements Catalog {
         return catalogOptions.get(LOCK_ENABLED);
     }
 
-    protected List<String> listDatabases(Path warehouse) {
-        List<String> databases = new ArrayList<>();
-        for (FileStatus status : uncheck(() -> 
fileIO.listDirectories(warehouse))) {
-            Path path = status.getPath();
-            if (status.isDir() && isDatabase(path)) {
-                databases.add(database(path));
-            }
-        }
-        return databases;
-    }
-
     @Override
     public boolean databaseExists(String databaseName) {
         if (isSystemDatabase(databaseName)) {
@@ -220,22 +209,8 @@ public abstract class AbstractCatalog implements Catalog {
         return 
listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
     }
 
-    protected List<String> listTablesImpl(Path databasePath) {
-        List<String> tables = new ArrayList<>();
-        for (FileStatus status : uncheck(() -> 
fileIO.listDirectories(databasePath))) {
-            if (status.isDir() && tableExists(status.getPath())) {
-                tables.add(status.getPath().getName());
-            }
-        }
-        return tables;
-    }
-
     protected abstract List<String> listTablesImpl(String databaseName);
 
-    protected boolean tableExists(Path tablePath) {
-        return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
-    }
-
     @Override
     public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
@@ -525,20 +500,35 @@ public abstract class AbstractCatalog implements Catalog {
                         CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
     }
 
-    private static boolean isDatabase(Path path) {
-        return path.getName().endsWith(DB_SUFFIX);
-    }
+    // =============================== Meta in File System 
=====================================
 
-    private static String database(Path path) {
-        String name = path.getName();
-        return name.substring(0, name.length() - DB_SUFFIX.length());
+    protected List<String> listDatabasesInFileSystem(Path warehouse) throws 
IOException {
+        List<String> databases = new ArrayList<>();
+        for (FileStatus status : fileIO.listDirectories(warehouse)) {
+            Path path = status.getPath();
+            if (status.isDir() && path.getName().endsWith(DB_SUFFIX)) {
+                String fileName = path.getName();
+                databases.add(fileName.substring(0, fileName.length() - 
DB_SUFFIX.length()));
+            }
+        }
+        return databases;
     }
 
-    protected static <T> T uncheck(Callable<T> callable) {
-        try {
-            return callable.call();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+    protected List<String> listTablesInFileSystem(Path databasePath) throws 
IOException {
+        List<String> tables = new ArrayList<>();
+        for (FileStatus status : fileIO.listDirectories(databasePath)) {
+            if (status.isDir() && tableExistsInFileSystem(status.getPath())) {
+                tables.add(status.getPath().getName());
+            }
         }
+        return tables;
+    }
+
+    protected boolean tableExistsInFileSystem(Path tablePath) {
+        return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
+    }
+
+    public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath) {
+        return new SchemaManager(fileIO, tablePath).latest();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 60e688bf8..f0e5572e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
 
@@ -55,7 +56,7 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public List<String> listDatabases() {
-        return listDatabases(warehouse);
+        return uncheck(() -> listDatabasesInFileSystem(warehouse));
     }
 
     @Override
@@ -89,7 +90,7 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     protected List<String> listTablesImpl(String databaseName) {
-        return listTablesImpl(newDatabasePath(databaseName));
+        return uncheck(() -> 
listTablesInFileSystem(newDatabasePath(databaseName)));
     }
 
     @Override
@@ -98,7 +99,7 @@ public class FileSystemCatalog extends AbstractCatalog {
             return super.tableExists(identifier);
         }
 
-        return tableExists(getDataTableLocation(identifier));
+        return tableExistsInFileSystem(getDataTableLocation(identifier));
     }
 
     @Override
@@ -149,6 +150,14 @@ public class FileSystemCatalog extends AbstractCatalog {
         schemaManager(identifier).commitChanges(changes);
     }
 
+    protected static <T> T uncheck(Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public void close() throws Exception {}
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 77a3b40af..25cb8784d 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -41,16 +41,9 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableType;
-import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataPartitionComputer;
 
 import org.apache.flink.table.hive.LegacyHiveClasses;
 import org.apache.hadoop.conf.Configuration;
@@ -80,13 +73,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -99,7 +90,6 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
@@ -333,12 +323,14 @@ public class HiveCatalog extends AbstractCatalog {
                     e);
         }
 
-        return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table);
+        return isPaimonTable(table);
     }
 
     private static boolean isPaimonTable(Table table) {
-        return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
-                && 
OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat());
+        boolean isPaimonTable =
+                INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
+                        && 
OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat());
+        return isPaimonTable || LegacyHiveClasses.isPaimonTable(table);
     }
 
     @Override
@@ -347,14 +339,8 @@ public class HiveCatalog extends AbstractCatalog {
             throw new TableNotExistException(identifier);
         }
         Path tableLocation = getDataTableLocation(identifier);
-        return getDataTableSchema(tableLocation);
-    }
-
-    private TableSchema getDataTableSchema(Path tableLocation) {
-        return new SchemaManager(fileIO, tableLocation)
-                .latest()
-                .orElseThrow(
-                        () -> new RuntimeException("There is no paimon table 
in " + tableLocation));
+        return tableSchemaInFileSystem(tableLocation)
+                .orElseThrow(() -> new TableNotExistException(identifier));
     }
 
     private boolean usingExternalTable() {
@@ -408,13 +394,8 @@ public class HiveCatalog extends AbstractCatalog {
                     e);
         }
 
-        Table table =
-                newHmsTable(
-                        identifier,
-                        convertToPropertiesPrefixKey(tableSchema.options(), 
HIVE_PREFIX));
         try {
-            updateHmsTable(table, identifier, tableSchema);
-            client.createTable(table);
+            client.createTable(createHiveTable(identifier, tableSchema));
         } catch (Exception e) {
             Path path = getDataTableLocation(identifier);
             try {
@@ -426,6 +407,15 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
+    private Table createHiveTable(Identifier identifier, TableSchema 
tableSchema) {
+        Table table =
+                newHmsTable(
+                        identifier,
+                        convertToPropertiesPrefixKey(tableSchema.options(), 
HIVE_PREFIX));
+        updateHmsTable(table, identifier, tableSchema);
+        return table;
+    }
+
     @Override
     protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
         try {
@@ -469,18 +459,21 @@ public class HiveCatalog extends AbstractCatalog {
         TableSchema schema = schemaManager.commitChanges(changes);
 
         try {
-            // sync to hive hms
             Table table = client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
-            updateHmsTablePars(table, schema);
-            updateHmsTable(table, identifier, schema);
-            client.alter_table(
-                    identifier.getDatabaseName(), identifier.getObjectName(), 
table, true);
+            alterTableToHms(table, identifier, schema);
         } catch (Exception te) {
             schemaManager.deleteSchema(schema.id());
             throw new RuntimeException(te);
         }
     }
 
+    private void alterTableToHms(Table table, Identifier identifier, 
TableSchema newSchema)
+            throws TException {
+        updateHmsTablePars(table, newSchema);
+        updateHmsTable(table, identifier, newSchema);
+        client.alter_table(identifier.getDatabaseName(), 
identifier.getObjectName(), table, true);
+    }
+
     @Override
     public boolean caseSensitive() {
         return false;
@@ -488,7 +481,12 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public void repairCatalog() {
-        List<String> databases = listDatabases(new Path(warehouse));
+        List<String> databases = null;
+        try {
+            databases = listDatabasesInFileSystem(new Path(warehouse));
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
         for (String database : databases) {
             repairDatabase(database);
         }
@@ -496,45 +494,31 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public void repairDatabase(String databaseName) {
-        Path databasePath = repairHmsDatabase(databaseName);
-        List<String> tables = listTablesImpl(databasePath);
-        CompletableFuture<Void> allOf =
-                CompletableFuture.allOf(
-                        tables.stream()
-                                .map(table -> Identifier.create(databaseName, 
table))
-                                .map(
-                                        identifier ->
-                                                CompletableFuture.runAsync(
-                                                        () -> {
-                                                            try {
-                                                                
repairTable(identifier);
-                                                            } catch 
(TableNotExistException e) {
-                                                                LOG.error(
-                                                                        "Table 
{} does not exist in the paimon.",
-                                                                        
identifier.getFullName());
-                                                                // ignore
-                                                            }
-                                                        },
-                                                        
COMMON_IO_FORK_JOIN_POOL))
-                                .toArray(CompletableFuture[]::new));
-        allOf.join();
-    }
-
-    private Path repairHmsDatabase(String databaseName) {
         checkNotSystemDatabase(databaseName);
+
+        // create database if needed
         if (!databaseExistsImpl(databaseName)) {
             createDatabaseImpl(databaseName, Collections.emptyMap());
         }
 
+        // tables from file system
+        List<String> tables;
         try {
-            Database database = client.getDatabase(databaseName);
-            return new Path(locationHelper.getDatabaseLocation(database));
+            tables =
+                    listTablesInFileSystem(
+                            new Path(
+                                    locationHelper.getDatabaseLocation(
+                                            
client.getDatabase(databaseName))));
         } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to determine if database "
-                            + databaseName
-                            + " exists in hive metastore.",
-                    e);
+            throw new RuntimeException(e);
+        }
+
+        // repair tables
+        for (String table : tables) {
+            try {
+                repairTable(Identifier.create(databaseName, table));
+            } catch (TableNotExistException ignore) {
+            }
         }
     }
 
@@ -543,79 +527,43 @@ public class HiveCatalog extends AbstractCatalog {
         checkNotSystemTable(identifier, "repairTable");
         validateIdentifierNameCaseInsensitive(identifier);
 
-        // Get paimon table from file system.
-        Path paimonTableLocation = getDataTableLocation(identifier);
-        TableSchema tableSchema = getDataTableSchema(paimonTableLocation);
-        validateFieldNameCaseInsensitive(tableSchema.fieldNames());
-        FileStoreTable paimonTable =
-                FileStoreTableFactory.create(
-                        fileIO,
-                        paimonTableLocation,
-                        tableSchema,
-                        new CatalogEnvironment(
-                                Lock.factory(
-                                        lockFactory().orElse(null),
-                                        lockContext().orElse(null),
-                                        identifier),
-                                
super.metastoreClientFactory(identifier).orElse(null),
-                                lineageMetaFactory));
-
+        TableSchema tableSchema =
+                tableSchemaInFileSystem(getDataTableLocation(identifier))
+                        .orElseThrow(() -> new 
TableNotExistException(identifier));
+        Table newTable = createHiveTable(identifier, tableSchema);
         try {
             try {
                 Table table =
                         client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
                 checkArgument(
-                        isPaimonTable(table) || 
LegacyHiveClasses.isPaimonTable(table),
-                        String.format(
-                                "Table %s is not a paimon table in hive 
metastore.",
-                                identifier.getFullName()));
-                updateHmsTablePars(table, tableSchema);
-                updateHmsTable(table, identifier, tableSchema);
-                client.alter_table(
-                        identifier.getDatabaseName(), 
identifier.getObjectName(), table, true);
+                        isPaimonTable(table),
+                        "Table %s is not a paimon table in hive metastore.",
+                        identifier.getFullName());
+                if 
(!newTable.getSd().getCols().equals(table.getSd().getCols())) {
+                    alterTableToHms(table, identifier, tableSchema);
+                }
             } catch (NoSuchObjectException e) {
                 // hive table does not exist.
-                HashMap<String, String> newOptions = new 
HashMap<>(paimonTable.options());
-                copyTableDefaultOptions(newOptions);
-                tableSchema = paimonTable.schema().copy(newOptions);
-                Table table =
-                        newHmsTable(
-                                identifier,
-                                
convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX));
-                updateHmsTable(table, identifier, tableSchema);
-                client.createTable(table);
+                client.createTable(newTable);
             }
 
             // repair partitions
-            repairPartition(paimonTable, identifier, tableSchema);
+            if (!tableSchema.partitionKeys().isEmpty() && 
!newTable.getPartitionKeys().isEmpty()) {
+                // Do not close client, it is for HiveCatalog
+                @SuppressWarnings("resource")
+                HiveMetastoreClient metastoreClient =
+                        new HiveMetastoreClient(identifier, tableSchema, 
client);
+                List<BinaryRow> partitions =
+                        
getTable(identifier).newReadBuilder().newScan().listPartitions();
+                for (BinaryRow partition : partitions) {
+                    metastoreClient.addPartition(partition);
+                }
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    private void repairPartition(
-            org.apache.paimon.table.Table paimonTable, Identifier identifier, 
TableSchema schema)
-            throws Exception {
-        if (!schema.partitionKeys().isEmpty()) {
-            MetastoreClient metastoreClient = 
metastoreClientFactory(identifier).get().create();
-
-            ReadBuilder readBuilder = paimonTable.newReadBuilder();
-            List<BinaryRow> partitions = 
readBuilder.newScan().listPartitions();
-            RowDataPartitionComputer partitionComputer =
-                    FileStorePathFactory.getPartitionComputer(
-                            schema.logicalPartitionType(),
-                            new 
CoreOptions(schema.options()).partitionDefaultName());
-            for (BinaryRow partition : partitions) {
-                LinkedHashMap<String, String> partitionSpec =
-                        partitionComputer.generatePartValues(
-                                Preconditions.checkNotNull(
-                                        partition,
-                                        "Partition row data is null. This is 
unexpected."));
-                metastoreClient.addPartition(partitionSpec);
-            }
-        }
-    }
-
     @Override
     public void close() throws Exception {
         client.close();
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 031b1848a..76b1f3cf2 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -45,7 +45,7 @@ public class HiveMetastoreClient implements MetastoreClient {
     private final IMetaStoreClient client;
     private final StorageDescriptor sd;
 
-    private HiveMetastoreClient(Identifier identifier, TableSchema schema, 
IMetaStoreClient client)
+    public HiveMetastoreClient(Identifier identifier, TableSchema schema, 
IMetaStoreClient client)
             throws Exception {
         this.identifier = identifier;
         this.partitionComputer =

Reply via email to