This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 965412b829 [hive] Lazily initialize HMS client pool in HiveCatalog
(#7346)
965412b829 is described below
commit 965412b829836ec52b56d99368b0bb5a74d182c4
Author: WenjunMin <[email protected]>
AuthorDate: Sat Mar 7 11:00:32 2026 +0800
[hive] Lazily initialize HMS client pool in HiveCatalog (#7346)
---
.../java/org/apache/paimon/hive/HiveCatalog.java | 254 ++++++++++++---------
.../paimon/hive/pool/TestCachedClientPool.java | 15 +-
2 files changed, 152 insertions(+), 117 deletions(-)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 38ecd9372d..ac0706b40b 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -157,7 +157,7 @@ public class HiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf;
private final String clientClassName;
private final Options options;
- private final ClientPool<IMetaStoreClient, TException> clients;
+ private volatile ClientPool<IMetaStoreClient, TException> clients;
private final String warehouse;
private final LocationHelper locationHelper;
@@ -188,8 +188,20 @@ public class HiveCatalog extends AbstractCatalog {
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
warehouse);
locationHelper = new StorageLocationHelper();
}
+ }
- this.clients = new CachedClientPool(hiveConf, options,
clientClassName);
+ private ClientPool<IMetaStoreClient, TException> clients() {
+ ClientPool<IMetaStoreClient, TException> local = clients;
+ if (local == null) {
+ synchronized (this) {
+ local = clients;
+ if (local == null) {
+ local = new CachedClientPool(hiveConf, options,
clientClassName);
+ clients = local;
+ }
+ }
+ }
+ return local;
}
private boolean formatTableDisabled() {
@@ -246,25 +258,28 @@ public class HiveCatalog extends AbstractCatalog {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getTableName();
Optional<Path> tablePath =
- clients.run(
- client -> {
- if (table != null) {
- String location =
locationHelper.getTableLocation(table);
- if (location != null) {
- return Optional.of(new Path(location));
- }
- } else {
- // If the table does not exist,
- // we should use the database path to
generate the table path.
- String dbLocation =
- locationHelper.getDatabaseLocation(
-
client.getDatabase(databaseName));
- if (dbLocation != null) {
- return Optional.of(new
Path(dbLocation, tableName));
- }
- }
- return Optional.empty();
- });
+ clients()
+ .run(
+ client -> {
+ if (table != null) {
+ String location =
+
locationHelper.getTableLocation(table);
+ if (location != null) {
+ return Optional.of(new
Path(location));
+ }
+ } else {
+ // If the table does not exist,
+ // we should use the database path
to generate the table
+ // path.
+ String dbLocation =
+
locationHelper.getDatabaseLocation(
+
client.getDatabase(databaseName));
+ if (dbLocation != null) {
+ return Optional.of(new
Path(dbLocation, tableName));
+ }
+ }
+ return Optional.empty();
+ });
return tablePath.orElse(super.getTableLocation(identifier));
} catch (TException e) {
throw new RuntimeException("Can not get table " + identifier + "
from metastore.", e);
@@ -278,7 +293,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public List<String> listDatabases() {
try {
- return clients.run(IMetaStoreClient::getAllDatabases);
+ return clients().run(IMetaStoreClient::getAllDatabases);
} catch (TException e) {
throw new RuntimeException("Failed to list all databases", e);
} catch (InterruptedException e) {
@@ -297,7 +312,7 @@ public class HiveCatalog extends AbstractCatalog {
: new Path(database.getLocationUri());
locationHelper.createPathIfRequired(databasePath,
fileIO(databasePath));
locationHelper.specifyDatabaseLocation(databasePath, database);
- clients.execute(client -> client.createDatabase(database));
+ clients().execute(client -> client.createDatabase(database));
} catch (TException | IOException e) {
throw new RuntimeException("Failed to create database " + name, e);
} catch (InterruptedException e) {
@@ -330,7 +345,7 @@ public class HiveCatalog extends AbstractCatalog {
public org.apache.paimon.catalog.Database getDatabaseImpl(String name)
throws DatabaseNotExistException {
try {
- Database database = clients.run(client ->
client.getDatabase(name));
+ Database database = clients().run(client ->
client.getDatabase(name));
Map<String, String> options = new
HashMap<>(database.getParameters());
if (database.getDescription() != null) {
options.put(COMMENT_PROP, database.getDescription());
@@ -416,7 +431,7 @@ public class HiveCatalog extends AbstractCatalog {
hivePartitions.add(hivePartition);
}
try {
- clients.execute(client -> client.add_partitions(hivePartitions,
true, false));
+ clients().execute(client -> client.add_partitions(hivePartitions,
true, false));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -436,13 +451,14 @@ public class HiveCatalog extends AbstractCatalog {
for (Map<String, String> part : metaPartitions) {
List<String> partitionValues = new ArrayList<>(part.values());
try {
- clients.execute(
- client ->
- client.dropPartition(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- partitionValues,
- false));
+ clients()
+ .execute(
+ client ->
+ client.dropPartition(
+
identifier.getDatabaseName(),
+ identifier.getTableName(),
+ partitionValues,
+ false));
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
} catch (Exception e) {
@@ -500,22 +516,24 @@ public class HiveCatalog extends AbstractCatalog {
try {
Partition hivePartition =
- clients.run(
- client ->
- client.getPartition(
-
identifier.getDatabaseName(),
- identifier.getObjectName(),
- partitionValues));
+ clients()
+ .run(
+ client ->
+ client.getPartition(
+
identifier.getDatabaseName(),
+
identifier.getObjectName(),
+ partitionValues));
hivePartition.setValues(partitionValues);
hivePartition.setLastAccessTime(
(int) (partition.lastFileCreationTime() / 1000));
hivePartition.getParameters().putAll(statistic);
- clients.execute(
- client ->
- client.alter_partition(
- identifier.getDatabaseName(),
- identifier.getObjectName(),
- hivePartition));
+ clients()
+ .execute(
+ client ->
+ client.alter_partition(
+
identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ hivePartition));
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
} catch (Exception e) {
@@ -529,16 +547,17 @@ public class HiveCatalog extends AbstractCatalog {
public void markDonePartitions(Identifier identifier, List<Map<String,
String>> partitions)
throws TableNotExistException {
try {
- clients.execute(
- client -> {
- for (Map<String, String> partition : partitions) {
- client.markPartitionForEvent(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- partition,
- PartitionEventType.LOAD_DONE);
- }
- });
+ clients()
+ .execute(
+ client -> {
+ for (Map<String, String> partition :
partitions) {
+ client.markPartitionForEvent(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ partition,
+ PartitionEventType.LOAD_DONE);
+ }
+ });
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
} catch (UnknownTableException e) {
@@ -598,12 +617,13 @@ public class HiveCatalog extends AbstractCatalog {
@VisibleForTesting
public List<Partition> listPartitionsFromHms(Identifier identifier)
throws TException, InterruptedException {
- return clients.run(
- client ->
- client.listPartitions(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- Short.MAX_VALUE));
+ return clients()
+ .run(
+ client ->
+ client.listPartitions(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ Short.MAX_VALUE));
}
private List<Map<String, String>> removePartitionsExistsInOtherBranches(
@@ -647,10 +667,10 @@ public class HiveCatalog extends AbstractCatalog {
@Override
protected void dropDatabaseImpl(String name) {
try {
- Database database = clients.run(client ->
client.getDatabase(name));
+ Database database = clients().run(client ->
client.getDatabase(name));
Path location = new
Path(locationHelper.getDatabaseLocation(database));
locationHelper.dropPathIfRequired(location, fileIO(location));
- clients.execute(client -> client.dropDatabase(name, true, false,
true));
+ clients().execute(client -> client.dropDatabase(name, true, false,
true));
} catch (TException | IOException e) {
throw new RuntimeException("Failed to drop database " + name, e);
} catch (InterruptedException e) {
@@ -662,7 +682,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
protected void alterDatabaseImpl(String name, List<PropertyChange>
changes) {
try {
- Database database = clients.run(client ->
client.getDatabase(name));
+ Database database = clients().run(client ->
client.getDatabase(name));
Map<String, String> parameter = new
HashMap<>(database.getParameters());
Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
PropertyChange.getSetPropertiesToRemoveKeys(changes);
@@ -675,7 +695,7 @@ public class HiveCatalog extends AbstractCatalog {
parameter.keySet().removeAll(removeKeys);
}
Database alterDatabase = convertToHiveDatabase(name, parameter);
- clients.execute(client -> client.alterDatabase(name,
alterDatabase));
+ clients().execute(client -> client.alterDatabase(name,
alterDatabase));
} catch (TException e) {
throw new RuntimeException("Failed to alter database " + name, e);
} catch (InterruptedException e) {
@@ -687,14 +707,14 @@ public class HiveCatalog extends AbstractCatalog {
@Override
protected List<String> listTablesImpl(String databaseName) {
try {
- List<String> tableNames = clients.run(client ->
client.getAllTables(databaseName));
+ List<String> tableNames = clients().run(client ->
client.getAllTables(databaseName));
int batchSize = getBatchGetTableSize();
List<Table> hmsTables =
Lists.partition(tableNames, batchSize).stream()
.flatMap(
batchTableNames -> {
try {
- return clients
+ return clients()
.run(
client ->
client.getTableObjectsByName(
@@ -851,7 +871,7 @@ public class HiveCatalog extends AbstractCatalog {
sd.setCols(columns);
try {
- clients.execute(client -> client.createTable(hiveTable));
+ clients().execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db
and fs if failed.
throw new RuntimeException("Failed to create table " +
identifier.getFullName(), e);
@@ -871,14 +891,15 @@ public class HiveCatalog extends AbstractCatalog {
}
try {
- clients.execute(
- client ->
- client.dropTable(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- false,
- false,
- false));
+ clients()
+ .execute(
+ client ->
+ client.dropTable(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ false,
+ false,
+ false));
} catch (TException e) {
throw new RuntimeException("Failed to drop view " +
identifier.getFullName(), e);
} catch (InterruptedException e) {
@@ -896,8 +917,8 @@ public class HiveCatalog extends AbstractCatalog {
getDatabase(databaseName);
try {
- return clients.run(
- client -> client.getTables(databaseName, "*",
TableType.VIRTUAL_VIEW));
+ return clients()
+ .run(client -> client.getTables(databaseName, "*",
TableType.VIRTUAL_VIEW));
} catch (TException e) {
throw new RuntimeException("Failed to list views in database " +
databaseName, e);
} catch (InterruptedException e) {
@@ -994,7 +1015,7 @@ public class HiveCatalog extends AbstractCatalog {
Path location = pair.getLeft();
boolean externalTable = pair.getRight();
Table hiveTable = createHiveFormatTable(identifier, newSchema,
location, externalTable);
- clients.execute(client -> client.createTable(hiveTable));
+ clients().execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db
and fs if failed.
throw new RuntimeException("Failed to create table " +
identifier.getFullName(), e);
@@ -1019,14 +1040,15 @@ public class HiveCatalog extends AbstractCatalog {
protected void dropTableImpl(Identifier identifier, List<Path>
externalPaths) {
try {
boolean externalTable = isExternalTable(getHmsTable(identifier));
- clients.execute(
- client ->
- client.dropTable(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- !externalTable,
- false,
- true));
+ clients()
+ .execute(
+ client ->
+ client.dropTable(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ !externalTable,
+ false,
+ true));
// When drop a Hive external table, only the hive metadata is
deleted and the data files
// are not deleted.
@@ -1064,11 +1086,12 @@ public class HiveCatalog extends AbstractCatalog {
protected void createTableImpl(Identifier identifier, Schema schema) {
try {
boolean tableExists =
- clients.run(
- (client ->
- client.tableExists(
- identifier.getDatabaseName(),
- identifier.getTableName())));
+ clients()
+ .run(
+ (client ->
+ client.tableExists(
+
identifier.getDatabaseName(),
+
identifier.getTableName())));
if (tableExists) {
throw new RuntimeException(
"Table "
@@ -1097,11 +1120,15 @@ public class HiveCatalog extends AbstractCatalog {
}
try {
- clients.execute(
- client ->
- client.createTable(
- createHiveTable(
- identifier, tableSchema, location,
externalTable)));
+ clients()
+ .execute(
+ client ->
+ client.createTable(
+ createHiveTable(
+ identifier,
+ tableSchema,
+ location,
+ externalTable)));
} catch (Exception e) {
try {
if (!externalTable) {
@@ -1185,10 +1212,13 @@ public class HiveCatalog extends AbstractCatalog {
// update location
locationHelper.specifyTableLocation(table, toPath.toString());
- clients.execute(
- client ->
- client.alter_table(
- toTable.getDatabaseName(),
toTable.getTableName(), table));
+ clients()
+ .execute(
+ client ->
+ client.alter_table(
+ toTable.getDatabaseName(),
+ toTable.getTableName(),
+ table));
}
} catch (TException e) {
throw new RuntimeException("Failed to rename table " +
fromTable.getFullName(), e);
@@ -1202,10 +1232,10 @@ public class HiveCatalog extends AbstractCatalog {
try {
String fromDB = fromTable.getDatabaseName();
String fromTableName = fromTable.getTableName();
- Table table = clients.run(client -> client.getTable(fromDB,
fromTableName));
+ Table table = clients().run(client -> client.getTable(fromDB,
fromTableName));
table.setDbName(toTable.getDatabaseName());
table.setTableName(toTable.getTableName());
- clients.execute(client -> client.alter_table(fromDB,
fromTableName, table));
+ clients().execute(client -> client.alter_table(fromDB,
fromTableName, table));
return table;
} catch (TException e) {
@@ -1263,7 +1293,7 @@ public class HiveCatalog extends AbstractCatalog {
Path location = getTableLocation(identifier, table);
// file format is null, because only data table support alter table.
updateHmsTable(table, identifier, newSchema, null, location);
- clients.execute(client -> HiveAlterTableUtils.alterTable(client,
identifier, table));
+ clients().execute(client -> HiveAlterTableUtils.alterTable(client,
identifier, table));
}
@Override
@@ -1307,7 +1337,7 @@ public class HiveCatalog extends AbstractCatalog {
// tables from file system
List<String> tables;
try {
- Database database = clients.run(client ->
client.getDatabase(databaseName));
+ Database database = clients().run(client ->
client.getDatabase(databaseName));
tables = listTablesInFileSystem(new
Path(locationHelper.getDatabaseLocation(database)));
} catch (Exception e) {
throw new RuntimeException(e);
@@ -1357,7 +1387,7 @@ public class HiveCatalog extends AbstractCatalog {
usingExternalTable(tableSchema.options()));
}
Table finalNewTable = newTable;
- clients.execute(client -> client.createTable(finalNewTable));
+ clients().execute(client -> client.createTable(finalNewTable));
}
// repair partitions
@@ -1400,10 +1430,12 @@ public class HiveCatalog extends AbstractCatalog {
public Table getHmsTable(Identifier identifier)
throws TableNotExistException, TableNoPermissionException {
try {
- return clients.run(
- client ->
- client.getTable(
- identifier.getDatabaseName(),
identifier.getTableName()));
+ return clients()
+ .run(
+ client ->
+ client.getTable(
+ identifier.getDatabaseName(),
+ identifier.getTableName()));
} catch (NoSuchObjectException e) {
throw new TableNotExistException(identifier);
} catch (TException e) {
@@ -1667,7 +1699,7 @@ public class HiveCatalog extends AbstractCatalog {
@VisibleForTesting
public IMetaStoreClient getHmsClient() {
try {
- return clients.run(client -> client);
+ return clients().run(client -> client);
} catch (Exception e) {
throw new RuntimeException("Failed to close hms client:", e);
}
@@ -1691,7 +1723,7 @@ public class HiveCatalog extends AbstractCatalog {
HiveCatalogLock lock =
new HiveCatalogLock(
- clients,
+ clients(),
HiveCatalogLock.checkMaxSleep(hiveConf),
HiveCatalogLock.acquireTimeout(hiveConf));
return Lock.fromCatalog(lock, identifier).runWithLock(callable);
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
index 6108633398..596ccf5c19 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
@@ -417,12 +417,15 @@ public class TestCachedClientPool {
String metastoreClientClass =
"org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
- return new HiveCatalog(
- fileIO,
- hiveConf,
- metastoreClientClass,
-
CatalogContext.create(options),
- warehouse);
+ HiveCatalog hCata =
+ new HiveCatalog(
+ fileIO,
+ hiveConf,
+
metastoreClientClass,
+
CatalogContext.create(options),
+ warehouse);
+ hCata.listDatabases();
+ return hCata;
} catch (Exception e) {
throw new RuntimeException(e);
}