This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 592bd24b545b2ce469df481947f15a9a2f2d7f6a Author: Jingsong <[email protected]> AuthorDate: Fri May 24 16:17:12 2024 +0800 [core][hive] Optimize codes for repairing metastore procedure --- .../org/apache/paimon/catalog/AbstractCatalog.java | 64 +++---- .../apache/paimon/catalog/FileSystemCatalog.java | 15 +- .../java/org/apache/paimon/hive/HiveCatalog.java | 194 ++++++++------------- .../apache/paimon/hive/HiveMetastoreClient.java | 2 +- 4 files changed, 111 insertions(+), 164 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 63428635e..f93198fb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -42,6 +42,7 @@ import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,7 +51,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.stream.Collectors; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; @@ -127,17 +127,6 @@ public abstract class AbstractCatalog implements Catalog { return catalogOptions.get(LOCK_ENABLED); } - protected List<String> listDatabases(Path warehouse) { - List<String> databases = new ArrayList<>(); - for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) { - Path path = status.getPath(); - if (status.isDir() && isDatabase(path)) { - databases.add(database(path)); - } - } - return databases; - } - @Override public boolean databaseExists(String databaseName) { if (isSystemDatabase(databaseName)) { @@ -220,22 +209,8 @@ public abstract class AbstractCatalog implements Catalog { return listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList()); } - protected List<String> listTablesImpl(Path databasePath) { - List<String> tables = new ArrayList<>(); - for (FileStatus status : uncheck(() -> fileIO.listDirectories(databasePath))) { - if (status.isDir() && tableExists(status.getPath())) { - tables.add(status.getPath().getName()); - } - } - return tables; - } - protected abstract List<String> listTablesImpl(String databaseName); - protected boolean tableExists(Path tablePath) { - return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty(); - } - @Override public void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException { @@ -525,20 +500,35 @@ public abstract class AbstractCatalog implements Catalog { CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } - private static boolean isDatabase(Path path) { - return path.getName().endsWith(DB_SUFFIX); - } + // =============================== Meta in File System ===================================== - private static String database(Path path) { - String name = path.getName(); - return name.substring(0, name.length() - DB_SUFFIX.length()); + protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOException { + List<String> databases = new ArrayList<>(); + for (FileStatus status : fileIO.listDirectories(warehouse)) { + Path path = status.getPath(); + if (status.isDir() && path.getName().endsWith(DB_SUFFIX)) { + String fileName = path.getName(); + databases.add(fileName.substring(0, fileName.length() - DB_SUFFIX.length())); + } + } + return databases; } - protected static <T> T uncheck(Callable<T> callable) { - try { - return callable.call(); - } catch (Exception e) { - throw new RuntimeException(e); + protected List<String> listTablesInFileSystem(Path databasePath) throws IOException { + List<String> tables = new ArrayList<>(); + for (FileStatus status : fileIO.listDirectories(databasePath)) { + if (status.isDir() && tableExistsInFileSystem(status.getPath())) { + tables.add(status.getPath().getName()); + } } + return tables; + } + + protected boolean tableExistsInFileSystem(Path tablePath) { + return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty(); + } + + public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath) { + return new SchemaManager(fileIO, tablePath).latest(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 60e688bf8..f0e5572e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; @@ -55,7 +56,7 @@ public class FileSystemCatalog extends AbstractCatalog { @Override public List<String> listDatabases() { - return listDatabases(warehouse); + return uncheck(() -> listDatabasesInFileSystem(warehouse)); } @Override @@ -89,7 +90,7 @@ public class FileSystemCatalog extends AbstractCatalog { @Override protected List<String> listTablesImpl(String databaseName) { - return listTablesImpl(newDatabasePath(databaseName)); + return uncheck(() -> listTablesInFileSystem(newDatabasePath(databaseName))); } @Override @@ -98,7 +99,7 @@ public class FileSystemCatalog extends AbstractCatalog { return super.tableExists(identifier); } - return tableExists(getDataTableLocation(identifier)); + return tableExistsInFileSystem(getDataTableLocation(identifier)); } @Override @@ -149,6 +150,14 @@ public class FileSystemCatalog extends AbstractCatalog { schemaManager(identifier).commitChanges(changes); } + protected static <T> T uncheck(Callable<T> callable) { + try { + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void close() throws Exception {} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 77a3b40af..25cb8784d 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -41,16 +41,9 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.CatalogEnvironment; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableType; -import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.RowDataPartitionComputer; import org.apache.flink.table.hive.LegacyHiveClasses; import org.apache.hadoop.conf.Configuration; @@ -80,13 +73,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -99,7 +90,6 @@ import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; -import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -333,12 +323,14 @@ public class HiveCatalog extends AbstractCatalog { e); } - return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); + return isPaimonTable(table); } private static boolean isPaimonTable(Table table) { - return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) - && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); + boolean isPaimonTable = + INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) + && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); + return isPaimonTable || LegacyHiveClasses.isPaimonTable(table); } @Override @@ -347,14 +339,8 @@ public class HiveCatalog extends AbstractCatalog { throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); - return getDataTableSchema(tableLocation); - } - - private TableSchema getDataTableSchema(Path tableLocation) { - return new SchemaManager(fileIO, tableLocation) - .latest() - .orElseThrow( - () -> new RuntimeException("There is no paimon table in " + tableLocation)); + return tableSchemaInFileSystem(tableLocation) + .orElseThrow(() -> new TableNotExistException(identifier)); } private boolean usingExternalTable() { @@ -408,13 +394,8 @@ public class HiveCatalog extends AbstractCatalog { e); } - Table table = - newHmsTable( - identifier, - convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); try { - updateHmsTable(table, identifier, tableSchema); - client.createTable(table); + client.createTable(createHiveTable(identifier, tableSchema)); } catch (Exception e) { Path path = getDataTableLocation(identifier); try { @@ -426,6 +407,15 @@ public class HiveCatalog extends AbstractCatalog { } } + private Table createHiveTable(Identifier identifier, TableSchema tableSchema) { + Table table = + newHmsTable( + identifier, + convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); + updateHmsTable(table, identifier, tableSchema); + return table; + } + @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { @@ -469,18 +459,21 @@ public class HiveCatalog extends AbstractCatalog { TableSchema schema = schemaManager.commitChanges(changes); try { - // sync to hive hms Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); - updateHmsTablePars(table, schema); - updateHmsTable(table, identifier, schema); - client.alter_table( - identifier.getDatabaseName(), identifier.getObjectName(), table, true); + alterTableToHms(table, identifier, schema); } catch (Exception te) { schemaManager.deleteSchema(schema.id()); throw new RuntimeException(te); } } + private void alterTableToHms(Table table, Identifier identifier, TableSchema newSchema) + throws TException { + updateHmsTablePars(table, newSchema); + updateHmsTable(table, identifier, newSchema); + client.alter_table(identifier.getDatabaseName(), identifier.getObjectName(), table, true); + } + @Override public boolean caseSensitive() { return false; @@ -488,7 +481,12 @@ public class HiveCatalog extends AbstractCatalog { @Override public void repairCatalog() { - List<String> databases = listDatabases(new Path(warehouse)); + List<String> databases = null; + try { + databases = listDatabasesInFileSystem(new Path(warehouse)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } for (String database : databases) { repairDatabase(database); } @@ -496,45 +494,31 @@ public class HiveCatalog extends AbstractCatalog { @Override public void repairDatabase(String databaseName) { - Path databasePath = repairHmsDatabase(databaseName); - List<String> tables = listTablesImpl(databasePath); - CompletableFuture<Void> allOf = - CompletableFuture.allOf( - tables.stream() - .map(table -> Identifier.create(databaseName, table)) - .map( - identifier -> - CompletableFuture.runAsync( - () -> { - try { - repairTable(identifier); - } catch (TableNotExistException e) { - LOG.error( - "Table {} does not exist in the paimon.", - identifier.getFullName()); - // ignore - } - }, - COMMON_IO_FORK_JOIN_POOL)) - .toArray(CompletableFuture[]::new)); - allOf.join(); - } - - private Path repairHmsDatabase(String databaseName) { checkNotSystemDatabase(databaseName); + + // create database if needed if (!databaseExistsImpl(databaseName)) { createDatabaseImpl(databaseName, Collections.emptyMap()); } + // tables from file system + List<String> tables; try { - Database database = client.getDatabase(databaseName); - return new Path(locationHelper.getDatabaseLocation(database)); + tables = + listTablesInFileSystem( + new Path( + locationHelper.getDatabaseLocation( + client.getDatabase(databaseName)))); } catch (Exception e) { - throw new RuntimeException( - "Failed to determine if database " - + databaseName - + " exists in hive metastore.", - e); + throw new RuntimeException(e); + } + + // repair tables + for (String table : tables) { + try { + repairTable(Identifier.create(databaseName, table)); + } catch (TableNotExistException ignore) { + } } } @@ -543,79 +527,43 @@ public class HiveCatalog extends AbstractCatalog { checkNotSystemTable(identifier, "repairTable"); validateIdentifierNameCaseInsensitive(identifier); - // Get paimon table from file system. - Path paimonTableLocation = getDataTableLocation(identifier); - TableSchema tableSchema = getDataTableSchema(paimonTableLocation); - validateFieldNameCaseInsensitive(tableSchema.fieldNames()); - FileStoreTable paimonTable = - FileStoreTableFactory.create( - fileIO, - paimonTableLocation, - tableSchema, - new CatalogEnvironment( - Lock.factory( - lockFactory().orElse(null), - lockContext().orElse(null), - identifier), - super.metastoreClientFactory(identifier).orElse(null), - lineageMetaFactory)); - + TableSchema tableSchema = + tableSchemaInFileSystem(getDataTableLocation(identifier)) + .orElseThrow(() -> new TableNotExistException(identifier)); + Table newTable = createHiveTable(identifier, tableSchema); try { try { Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); checkArgument( - isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table), - String.format( - "Table %s is not a paimon table in hive metastore.", - identifier.getFullName())); - updateHmsTablePars(table, tableSchema); - updateHmsTable(table, identifier, tableSchema); - client.alter_table( - identifier.getDatabaseName(), identifier.getObjectName(), table, true); + isPaimonTable(table), + "Table %s is not a paimon table in hive metastore.", + identifier.getFullName()); + if (!newTable.getSd().getCols().equals(table.getSd().getCols())) { + alterTableToHms(table, identifier, tableSchema); + } } catch (NoSuchObjectException e) { // hive table does not exist. - HashMap<String, String> newOptions = new HashMap<>(paimonTable.options()); - copyTableDefaultOptions(newOptions); - tableSchema = paimonTable.schema().copy(newOptions); - Table table = - newHmsTable( - identifier, - convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); - updateHmsTable(table, identifier, tableSchema); - client.createTable(table); + client.createTable(newTable); } // repair partitions - repairPartition(paimonTable, identifier, tableSchema); + if (!tableSchema.partitionKeys().isEmpty() && !newTable.getPartitionKeys().isEmpty()) { + // Do not close client, it is for HiveCatalog + @SuppressWarnings("resource") + HiveMetastoreClient metastoreClient = + new HiveMetastoreClient(identifier, tableSchema, client); + List<BinaryRow> partitions = + getTable(identifier).newReadBuilder().newScan().listPartitions(); + for (BinaryRow partition : partitions) { + metastoreClient.addPartition(partition); + } + } } catch (Exception e) { throw new RuntimeException(e); } } - private void repairPartition( - org.apache.paimon.table.Table paimonTable, Identifier identifier, TableSchema schema) - throws Exception { - if (!schema.partitionKeys().isEmpty()) { - MetastoreClient metastoreClient = metastoreClientFactory(identifier).get().create(); - - ReadBuilder readBuilder = paimonTable.newReadBuilder(); - List<BinaryRow> partitions = readBuilder.newScan().listPartitions(); - RowDataPartitionComputer partitionComputer = - FileStorePathFactory.getPartitionComputer( - schema.logicalPartitionType(), - new CoreOptions(schema.options()).partitionDefaultName()); - for (BinaryRow partition : partitions) { - LinkedHashMap<String, String> partitionSpec = - partitionComputer.generatePartValues( - Preconditions.checkNotNull( - partition, - "Partition row data is null. This is unexpected.")); - metastoreClient.addPartition(partitionSpec); - } - } - } - @Override public void close() throws Exception { client.close(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 031b1848a..76b1f3cf2 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -45,7 +45,7 @@ public class HiveMetastoreClient implements MetastoreClient { private final IMetaStoreClient client; private final StorageDescriptor sd; - private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client) + public HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client) throws Exception { this.identifier = identifier; this.partitionComputer =
